AMBARI-6887. Alerts: groundwork for alert collection (ncole)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/14e79ed1 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/14e79ed1 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/14e79ed1 Branch: refs/heads/branch-alerts-dev Commit: 14e79ed1fbd6b2891e1b66163c61eb360a4c6c38 Parents: 8e48128 Author: Nate Cole <nc...@hortonworks.com> Authored: Sun Aug 17 19:26:50 2014 -0400 Committer: Jonathan Hurley <jhur...@hortonworks.com> Committed: Wed Aug 20 10:50:58 2014 -0400 ---------------------------------------------------------------------- ambari-agent/pom.xml | 1 + .../ambari_agent/AlertSchedulerHandler.py | 127 ++++ .../src/main/python/ambari_agent/Controller.py | 12 +- .../main/python/ambari_agent/alerts/__init__.py | 18 + .../python/ambari_agent/alerts/base_alert.py | 72 +++ .../python/ambari_agent/alerts/port_alert.py | 96 +++ .../python/ambari_agent/apscheduler/__init__.py | 3 + .../python/ambari_agent/apscheduler/events.py | 64 ++ .../main/python/ambari_agent/apscheduler/job.py | 137 +++++ .../apscheduler/jobstores/__init__.py | 0 .../ambari_agent/apscheduler/jobstores/base.py | 25 + .../apscheduler/jobstores/mongodb_store.py | 84 +++ .../apscheduler/jobstores/ram_store.py | 25 + .../apscheduler/jobstores/redis_store.py | 91 +++ .../apscheduler/jobstores/shelve_store.py | 74 +++ .../apscheduler/jobstores/sqlalchemy_store.py | 91 +++ .../ambari_agent/apscheduler/scheduler.py | 607 +++++++++++++++++++ .../ambari_agent/apscheduler/threadpool.py | 133 ++++ .../apscheduler/triggers/__init__.py | 3 + .../apscheduler/triggers/cron/__init__.py | 144 +++++ .../apscheduler/triggers/cron/expressions.py | 194 ++++++ .../apscheduler/triggers/cron/fields.py | 100 +++ .../apscheduler/triggers/interval.py | 39 ++ .../ambari_agent/apscheduler/triggers/simple.py | 17 + .../python/ambari_agent/apscheduler/util.py | 230 +++++++ .../src/test/python/ambari_agent/TestAlerts.py | 73 +++ .../dummy_files/alert_definitions.json | 46 ++ 27 files changed, 2505 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/14e79ed1/ambari-agent/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-agent/pom.xml b/ambari-agent/pom.xml index ebf30fa..1c3083b 100644 --- a/ambari-agent/pom.xml +++ b/ambari-agent/pom.xml @@ -628,6 +628,7 @@ <exclude>src/test/python/ambari_agent/dummy_files/*</exclude> <exclude>src/test/python/ambari_agent/dummy*.txt</exclude> <exclude>src/main/python/ambari_agent/imports.txt</exclude> + <exclude>src/main/python/ambari_agent/apscheduler/**</exclude> <exclude>**/*.erb</exclude> <exclude>**/*.json</exclude> <exclude>**/*.pydevproject</exclude> http://git-wip-us.apache.org/repos/asf/ambari/blob/14e79ed1/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py new file mode 100644 index 0000000..cd0605f --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +''' +http://apscheduler.readthedocs.org/en/v2.1.2 +''' +from apscheduler.scheduler import Scheduler +from alerts.port_alert import PortAlert +import json +import logging +import sys +import time + +logger = logging.getLogger() + +class AlertSchedulerHandler(): + + def __init__(self, filename, in_minutes=True): + self.filename = filename + + config = { + 'threadpool.core_threads': 3, + 'coalesce': True, + 'standalone': False + } + + self.scheduler = Scheduler(config) + + alert_callables = self.__load_alerts() + + for _callable in alert_callables: + if in_minutes: + self.scheduler.add_interval_job(self.__make_function(_callable), + minutes=_callable.interval()) + else: + self.scheduler.add_interval_job(self.__make_function(_callable), + seconds=_callable.interval()) + + def __make_function(self, alert_def): + return lambda: alert_def.collect() + + def start(self): + if not self.scheduler is None: + self.scheduler.start() + + def stop(self): + if not self.scheduler is None: + self.scheduler.shutdown(wait=False) + self.scheduler = None + + def __load_alerts(self): + definitions = [] + try: + # FIXME make location configurable + with open(self.filename) as fp: + cluster_defs = json.load(fp) + for deflist in cluster_defs.values(): + for definition in deflist: + obj = self.__json_to_callable(definition) + if obj is not None: + definitions.append(obj) + except: + import traceback + traceback.print_exc() + pass + return definitions + + def __json_to_callable(self, json_definition): + source = json_definition['source'] + source_type = source.get('type', '') + + alert = None + + if source_type == 'METRIC': + pass + elif source_type == 'PORT': + alert = PortAlert(json_definition, source) + elif type == 'SCRIPT': + pass + + return alert + + def __json_to_meta(self, json_definition): + pass + +def main(): + args = list(sys.argv) + del args[0] + + try: + logger.setLevel(logger.debug) + except TypeError: + logger.setLevel(12) + + ash = AlertSchedulerHandler(args[0], False) + ash.start() + + i = 0 + try: + while i < 10: + time.sleep(1) + i += 1 + except KeyboardInterrupt: + pass + ash.stop() + +if __name__ == "__main__": + main() + + http://git-wip-us.apache.org/repos/asf/ambari/blob/14e79ed1/ambari-agent/src/main/python/ambari_agent/Controller.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index 87af939..3be54c2 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -39,7 +39,7 @@ import security from NetUtil import NetUtil import ssl from LiveStatus import LiveStatus - +from AlertSchedulerHandler import AlertSchedulerHandler logger = logging.getLogger() @@ -73,6 +73,14 @@ class Controller(threading.Thread): self.heartbeat_wait_event = threading.Event() # List of callbacks that are called at agent registration self.registration_listeners = [] + + # pull config directory out of config + cache_dir = config.get('agent', 'cache_dir') + if cache_dir is None: + cache_dir = '/var/lib/ambari-agent/cache' + + self.alert_scheduler_handler = AlertSchedulerHandler( + os.path.join(cache_dir, 'alerts', 'alert_definitions.json')) def __del__(self): @@ -317,6 +325,8 @@ class Controller(threading.Thread): message = registerResponse['response'] logger.info("Registration response from %s was %s", self.serverHostname, message) + self.alert_scheduler_handler.start() + if self.isRegistered: # Clearing command queue to stop executing "stale" commands # after registration http://git-wip-us.apache.org/repos/asf/ambari/blob/14e79ed1/ambari-agent/src/main/python/ambari_agent/alerts/__init__.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/__init__.py b/ambari-agent/src/main/python/ambari_agent/alerts/__init__.py new file mode 100644 index 0000000..0a0e1ca --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/alerts/__init__.py @@ -0,0 +1,18 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/14e79ed1/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py new file mode 100644 index 0000000..e102d56 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import logging + +logger = logging.getLogger() + +class BaseAlert(object): + RESULT_OK = 'OK' + RESULT_WARNING = 'WARNING' + RESULT_CRITICAL = 'CRITICAL' + RESULT_UNKNOWN = 'UNKNOWN' + + def __init__(self, alert_meta, alert_source_meta): + self.alert_meta = alert_meta + self.alert_source_meta = alert_source_meta + + def interval(self): + if not self.alert_meta.has_key('interval'): + return 1 + else: + return self.alert_meta['interval'] + + def collect(self): + res = (BaseAlert.RESULT_UNKNOWN, []) + try: + res = self._collect() + except Exception as e: + res = (BaseAlert.RESULT_CRITICAL, [str(e)]) + + res_base_text = self.alert_source_meta['reporting'][res[0].lower()]['text'] + + data = {} + data['name'] = self._find_value('name') + data['state'] = res[0] + data['text'] = res_base_text.format(*res[1]) + # data['cluster'] = self._find_value('cluster') # not sure how to get this yet + data['service'] = self._find_value('service') + data['component'] = self._find_value('component') + + print str(data) + + def _find_value(self, meta_key): + if self.alert_meta.has_key(meta_key): + return self.alert_meta[meta_key] + else: + return None + + def _collect(self): + ''' + Low level function to collect alert data. The result is a tuple as: + res[0] = the result code + res[1] = the list of arguments supplied to the reporting text for the result code + ''' + raise NotImplementedError \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/14e79ed1/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py new file mode 100644 index 0000000..165f890 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import logging +import re +import socket +import time +import traceback +from alerts.base_alert import BaseAlert +from resource_management.libraries.functions.get_port_from_url import get_port_from_url + +logger = logging.getLogger() + +class PortAlert(BaseAlert): + + def __init__(self, alert_meta, alert_source_meta): + super(PortAlert, self).__init__(alert_meta, alert_source_meta) + + default_port = alert_source_meta['default_port'] + uri = alert_source_meta['uri'] + + self.port = default_port + self.host = get_host_from_url(uri) + + try: + self.port = int(get_port_from_url(uri)) + except: + traceback.print_exc() + pass + + + def _collect(self): + s = None + try: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(1.5) + t = time.time() + s.connect((self.host, self.port)) + millis = time.time() - t + return (self.RESULT_OK, [millis/1000, self.port]) + finally: + if s is not None: + try: + s.close() + except: + pass + +''' +See RFC3986, Appendix B +Tested on the following cases: + "192.168.54.1" + "192.168.54.2:7661 + "hdfs://192.168.54.3/foo/bar" + "ftp://192.168.54.4:7842/foo/bar" +''' +def get_host_from_url(uri): + # RFC3986, Appendix B + parts = re.findall('^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?' , uri) + + # index of parts + # scheme = 1 + # authority = 3 + # path = 4 + # query = 6 + # fragment = 8 + + host_and_port = uri + if 0 == len(parts[0][1]): + host_and_port = parts[0][4] + elif 0 == len(parts[0][2]): + host_and_port = parts[0][1] + elif parts[0][2].startswith("//"): + host_and_port = parts[0][3] + + if -1 == host_and_port.find(':'): + return host_and_port + else: + return host_and_port.split(':')[0] + http://git-wip-us.apache.org/repos/asf/ambari/blob/14e79ed1/ambari-agent/src/main/python/ambari_agent/apscheduler/__init__.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/__init__.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/__init__.py new file mode 100644 index 0000000..71cc53d --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/__init__.py @@ -0,0 +1,3 @@ +version_info = (2, 1, 2) +version = '.'.join(str(n) for n in version_info[:3]) +release = '.'.join(str(n) for n in version_info) http://git-wip-us.apache.org/repos/asf/ambari/blob/14e79ed1/ambari-agent/src/main/python/ambari_agent/apscheduler/events.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/events.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/events.py new file mode 100644 index 0000000..80bde8e --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/events.py @@ -0,0 +1,64 @@ +__all__ = ('EVENT_SCHEDULER_START', 'EVENT_SCHEDULER_SHUTDOWN', + 'EVENT_JOBSTORE_ADDED', 'EVENT_JOBSTORE_REMOVED', + 'EVENT_JOBSTORE_JOB_ADDED', 'EVENT_JOBSTORE_JOB_REMOVED', + 'EVENT_JOB_EXECUTED', 'EVENT_JOB_ERROR', 'EVENT_JOB_MISSED', + 'EVENT_ALL', 'SchedulerEvent', 'JobStoreEvent', 'JobEvent') + + +EVENT_SCHEDULER_START = 1 # The scheduler was started +EVENT_SCHEDULER_SHUTDOWN = 2 # The scheduler was shut down +EVENT_JOBSTORE_ADDED = 4 # A job store was added to the scheduler +EVENT_JOBSTORE_REMOVED = 8 # A job store was removed from the scheduler +EVENT_JOBSTORE_JOB_ADDED = 16 # A job was added to a job store +EVENT_JOBSTORE_JOB_REMOVED = 32 # A job was removed from a job store +EVENT_JOB_EXECUTED = 64 # A job was executed successfully +EVENT_JOB_ERROR = 128 # A job raised an exception during execution +EVENT_JOB_MISSED = 256 # A job's execution was missed +EVENT_ALL = (EVENT_SCHEDULER_START | EVENT_SCHEDULER_SHUTDOWN | + EVENT_JOBSTORE_ADDED | EVENT_JOBSTORE_REMOVED | + EVENT_JOBSTORE_JOB_ADDED | EVENT_JOBSTORE_JOB_REMOVED | + EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED) + + +class SchedulerEvent(object): + """ + An event that concerns the scheduler itself. + + :var code: the type code of this event + """ + def __init__(self, code): + self.code = code + + +class JobStoreEvent(SchedulerEvent): + """ + An event that concerns job stores. + + :var alias: the alias of the job store involved + :var job: the new job if a job was added + """ + def __init__(self, code, alias, job=None): + SchedulerEvent.__init__(self, code) + self.alias = alias + if job: + self.job = job + + +class JobEvent(SchedulerEvent): + """ + An event that concerns the execution of individual jobs. + + :var job: the job instance in question + :var scheduled_run_time: the time when the job was scheduled to be run + :var retval: the return value of the successfully executed job + :var exception: the exception raised by the job + :var traceback: the traceback object associated with the exception + """ + def __init__(self, code, job, scheduled_run_time, retval=None, + exception=None, traceback=None): + SchedulerEvent.__init__(self, code) + self.job = job + self.scheduled_run_time = scheduled_run_time + self.retval = retval + self.exception = exception + self.traceback = traceback http://git-wip-us.apache.org/repos/asf/ambari/blob/14e79ed1/ambari-agent/src/main/python/ambari_agent/apscheduler/job.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/job.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/job.py new file mode 100644 index 0000000..cfc09a2 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/job.py @@ -0,0 +1,137 @@ +""" +Jobs represent scheduled tasks. +""" + +from threading import Lock +from datetime import timedelta + +from apscheduler.util import to_unicode, ref_to_obj, get_callable_name,\ + obj_to_ref + + +class MaxInstancesReachedError(Exception): + pass + + +class Job(object): + """ + Encapsulates the actual Job along with its metadata. Job instances + are created by the scheduler when adding jobs, and should not be + directly instantiated. These options can be set when adding jobs + to the scheduler (see :ref:`job_options`). + + :var trigger: trigger that determines the execution times + :var func: callable to call when the trigger is triggered + :var args: list of positional arguments to call func with + :var kwargs: dict of keyword arguments to call func with + :var name: name of the job + :var misfire_grace_time: seconds after the designated run time that + the job is still allowed to be run + :var coalesce: run once instead of many times if the scheduler determines + that the job should be run more than once in succession + :var max_runs: maximum number of times this job is allowed to be + triggered + :var max_instances: maximum number of concurrently running + instances allowed for this job + :var runs: number of times this job has been triggered + :var instances: number of concurrently running instances of this job + """ + id = None + next_run_time = None + + def __init__(self, trigger, func, args, kwargs, misfire_grace_time, + coalesce, name=None, max_runs=None, max_instances=1): + if not trigger: + raise ValueError('The trigger must not be None') + if not hasattr(func, '__call__'): + raise TypeError('func must be callable') + if not hasattr(args, '__getitem__'): + raise TypeError('args must be a list-like object') + if not hasattr(kwargs, '__getitem__'): + raise TypeError('kwargs must be a dict-like object') + if misfire_grace_time <= 0: + raise ValueError('misfire_grace_time must be a positive value') + if max_runs is not None and max_runs <= 0: + raise ValueError('max_runs must be a positive value') + if max_instances <= 0: + raise ValueError('max_instances must be a positive value') + + self._lock = Lock() + + self.trigger = trigger + self.func = func + self.args = args + self.kwargs = kwargs + self.name = to_unicode(name or get_callable_name(func)) + self.misfire_grace_time = misfire_grace_time + self.coalesce = coalesce + self.max_runs = max_runs + self.max_instances = max_instances + self.runs = 0 + self.instances = 0 + + def compute_next_run_time(self, now): + if self.runs == self.max_runs: + self.next_run_time = None + else: + self.next_run_time = self.trigger.get_next_fire_time(now) + + return self.next_run_time + + def get_run_times(self, now): + """ + Computes the scheduled run times between ``next_run_time`` and ``now``. + """ + run_times = [] + run_time = self.next_run_time + increment = timedelta(microseconds=1) + while ((not self.max_runs or self.runs < self.max_runs) and + run_time and run_time <= now): + run_times.append(run_time) + run_time = self.trigger.get_next_fire_time(run_time + increment) + + return run_times + + def add_instance(self): + self._lock.acquire() + try: + if self.instances == self.max_instances: + raise MaxInstancesReachedError + self.instances += 1 + finally: + self._lock.release() + + def remove_instance(self): + self._lock.acquire() + try: + assert self.instances > 0, 'Already at 0 instances' + self.instances -= 1 + finally: + self._lock.release() + + def __getstate__(self): + # Prevents the unwanted pickling of transient or unpicklable variables + state = self.__dict__.copy() + state.pop('instances', None) + state.pop('func', None) + state.pop('_lock', None) + state['func_ref'] = obj_to_ref(self.func) + return state + + def __setstate__(self, state): + state['instances'] = 0 + state['func'] = ref_to_obj(state.pop('func_ref')) + state['_lock'] = Lock() + self.__dict__ = state + + def __eq__(self, other): + if isinstance(other, Job): + return self.id is not None and other.id == self.id or self is other + return NotImplemented + + def __repr__(self): + return '<Job (name=%s, trigger=%s)>' % (self.name, repr(self.trigger)) + + def __str__(self): + return '%s (trigger: %s, next run at: %s)' % ( + self.name, str(self.trigger), str(self.next_run_time)) http://git-wip-us.apache.org/repos/asf/ambari/blob/14e79ed1/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/__init__.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/__init__.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/__init__.py new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/ambari/blob/14e79ed1/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/base.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/base.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/base.py new file mode 100644 index 0000000..f0a16dd --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/base.py @@ -0,0 +1,25 @@ +""" +Abstract base class that provides the interface needed by all job stores. +Job store methods are also documented here. +""" + + +class JobStore(object): + def add_job(self, job): + """Adds the given job from this store.""" + raise NotImplementedError + + def update_job(self, job): + """Persists the running state of the given job.""" + raise NotImplementedError + + def remove_job(self, job): + """Removes the given jobs from this store.""" + raise NotImplementedError + + def load_jobs(self): + """Loads jobs from this store into memory.""" + raise NotImplementedError + + def close(self): + """Frees any resources still bound to this job store.""" http://git-wip-us.apache.org/repos/asf/ambari/blob/14e79ed1/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/mongodb_store.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/mongodb_store.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/mongodb_store.py new file mode 100644 index 0000000..3f522c2 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/mongodb_store.py @@ -0,0 +1,84 @@ +""" +Stores jobs in a MongoDB database. +""" +import logging + +from apscheduler.jobstores.base import JobStore +from apscheduler.job import Job + +try: + import cPickle as pickle +except ImportError: # pragma: nocover + import pickle + +try: + from bson.binary import Binary + from pymongo.connection import Connection +except ImportError: # pragma: nocover + raise ImportError('MongoDBJobStore requires PyMongo installed') + +logger = logging.getLogger(__name__) + + +class MongoDBJobStore(JobStore): + def __init__(self, database='apscheduler', collection='jobs', + connection=None, pickle_protocol=pickle.HIGHEST_PROTOCOL, + **connect_args): + self.jobs = [] + self.pickle_protocol = pickle_protocol + + if not database: + raise ValueError('The "database" parameter must not be empty') + if not collection: + raise ValueError('The "collection" parameter must not be empty') + + if connection: + self.connection = connection + else: + self.connection = Connection(**connect_args) + + self.collection = self.connection[database][collection] + + def add_job(self, job): + job_dict = job.__getstate__() + job_dict['trigger'] = Binary(pickle.dumps(job.trigger, + self.pickle_protocol)) + job_dict['args'] = Binary(pickle.dumps(job.args, + self.pickle_protocol)) + job_dict['kwargs'] = Binary(pickle.dumps(job.kwargs, + self.pickle_protocol)) + job.id = self.collection.insert(job_dict) + self.jobs.append(job) + + def remove_job(self, job): + self.collection.remove(job.id) + self.jobs.remove(job) + + def load_jobs(self): + jobs = [] + for job_dict in self.collection.find(): + try: + job = Job.__new__(Job) + job_dict['id'] = job_dict.pop('_id') + job_dict['trigger'] = pickle.loads(job_dict['trigger']) + job_dict['args'] = pickle.loads(job_dict['args']) + job_dict['kwargs'] = pickle.loads(job_dict['kwargs']) + job.__setstate__(job_dict) + jobs.append(job) + except Exception: + job_name = job_dict.get('name', '(unknown)') + logger.exception('Unable to restore job "%s"', job_name) + self.jobs = jobs + + def update_job(self, job): + spec = {'_id': job.id} + document = {'$set': {'next_run_time': job.next_run_time}, + '$inc': {'runs': 1}} + self.collection.update(spec, document) + + def close(self): + self.connection.disconnect() + + def __repr__(self): + connection = self.collection.database.connection + return '<%s (connection=%s)>' % (self.__class__.__name__, connection) http://git-wip-us.apache.org/repos/asf/ambari/blob/14e79ed1/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/ram_store.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/ram_store.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/ram_store.py new file mode 100644 index 0000000..60458fb --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/ram_store.py @@ -0,0 +1,25 @@ +""" +Stores jobs in an array in RAM. Provides no persistence support. +""" + +from apscheduler.jobstores.base import JobStore + + +class RAMJobStore(JobStore): + def __init__(self): + self.jobs = [] + + def add_job(self, job): + self.jobs.append(job) + + def update_job(self, job): + pass + + def remove_job(self, job): + self.jobs.remove(job) + + def load_jobs(self): + pass + + def __repr__(self): + return '<%s>' % (self.__class__.__name__) http://git-wip-us.apache.org/repos/asf/ambari/blob/14e79ed1/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/redis_store.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/redis_store.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/redis_store.py new file mode 100644 index 0000000..5eabf4b --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/redis_store.py @@ -0,0 +1,91 @@ +""" +Stores jobs in a Redis database. +""" +from uuid import uuid4 +from datetime import datetime +import logging + +from apscheduler.jobstores.base import JobStore +from apscheduler.job import Job + +try: + import cPickle as pickle +except ImportError: # pragma: nocover + import pickle + +try: + from redis import StrictRedis +except ImportError: # pragma: nocover + raise ImportError('RedisJobStore requires redis installed') + +try: + long = long +except NameError: + long = int + +logger = logging.getLogger(__name__) + + +class RedisJobStore(JobStore): + def __init__(self, db=0, key_prefix='jobs.', + pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args): + self.jobs = [] + self.pickle_protocol = pickle_protocol + self.key_prefix = key_prefix + + if db is None: + raise ValueError('The "db" parameter must not be empty') + if not key_prefix: + raise ValueError('The "key_prefix" parameter must not be empty') + + self.redis = StrictRedis(db=db, **connect_args) + + def add_job(self, job): + job.id = str(uuid4()) + job_state = job.__getstate__() + job_dict = { + 'job_state': pickle.dumps(job_state, self.pickle_protocol), + 'runs': '0', + 'next_run_time': job_state.pop('next_run_time').isoformat()} + self.redis.hmset(self.key_prefix + job.id, job_dict) + self.jobs.append(job) + + def remove_job(self, job): + self.redis.delete(self.key_prefix + job.id) + self.jobs.remove(job) + + def load_jobs(self): + jobs = [] + keys = self.redis.keys(self.key_prefix + '*') + pipeline = self.redis.pipeline() + for key in keys: + pipeline.hgetall(key) + results = pipeline.execute() + + for job_dict in results: + job_state = {} + try: + job = Job.__new__(Job) + job_state = pickle.loads(job_dict['job_state'.encode()]) + job_state['runs'] = long(job_dict['runs'.encode()]) + dateval = job_dict['next_run_time'.encode()].decode() + job_state['next_run_time'] = datetime.strptime( + dateval, '%Y-%m-%dT%H:%M:%S') + job.__setstate__(job_state) + jobs.append(job) + except Exception: + job_name = job_state.get('name', '(unknown)') + logger.exception('Unable to restore job "%s"', job_name) + self.jobs = jobs + + def update_job(self, job): + attrs = { + 'next_run_time': job.next_run_time.isoformat(), + 'runs': job.runs} + self.redis.hmset(self.key_prefix + job.id, attrs) + + def close(self): + self.redis.connection_pool.disconnect() + + def __repr__(self): + return '<%s>' % self.__class__.__name__ http://git-wip-us.apache.org/repos/asf/ambari/blob/14e79ed1/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/shelve_store.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/shelve_store.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/shelve_store.py new file mode 100644 index 0000000..d1be58f --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/shelve_store.py @@ -0,0 +1,74 @@ +""" +Stores jobs in a file governed by the :mod:`shelve` module. +""" + +import shelve +import pickle +import random +import logging + +from apscheduler.jobstores.base import JobStore +from apscheduler.job import Job +from apscheduler.util import itervalues + +logger = logging.getLogger(__name__) + + +class ShelveJobStore(JobStore): + MAX_ID = 1000000 + + def __init__(self, path, pickle_protocol=pickle.HIGHEST_PROTOCOL): + self.jobs = [] + self.path = path + self.pickle_protocol = pickle_protocol + self._open_store() + + def _open_store(self): + self.store = shelve.open(self.path, 'c', self.pickle_protocol) + + def _generate_id(self): + id = None + while not id: + id = str(random.randint(1, self.MAX_ID)) + if not id in self.store: + return id + + def add_job(self, job): + job.id = self._generate_id() + self.store[job.id] = job.__getstate__() + self.store.close() + self._open_store() + self.jobs.append(job) + + def update_job(self, job): + job_dict = self.store[job.id] + job_dict['next_run_time'] = job.next_run_time + job_dict['runs'] = job.runs + self.store[job.id] = job_dict + self.store.close() + self._open_store() + + def remove_job(self, job): + del self.store[job.id] + self.store.close() + self._open_store() + self.jobs.remove(job) + + def load_jobs(self): + jobs = [] + for job_dict in itervalues(self.store): + try: + job = Job.__new__(Job) + job.__setstate__(job_dict) + jobs.append(job) + except Exception: + job_name = job_dict.get('name', '(unknown)') + logger.exception('Unable to restore job "%s"', job_name) + + self.jobs = jobs + + def close(self): + self.store.close() + + def __repr__(self): + return '<%s (path=%s)>' % (self.__class__.__name__, self.path) http://git-wip-us.apache.org/repos/asf/ambari/blob/14e79ed1/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/sqlalchemy_store.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/sqlalchemy_store.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/sqlalchemy_store.py new file mode 100644 index 0000000..5b64a35 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/jobstores/sqlalchemy_store.py @@ -0,0 +1,91 @@ +""" +Stores jobs in a database table using SQLAlchemy. +""" +import pickle +import logging + +import sqlalchemy + +from apscheduler.jobstores.base import JobStore +from apscheduler.job import Job + +try: + from sqlalchemy import * +except ImportError: # pragma: nocover + raise ImportError('SQLAlchemyJobStore requires SQLAlchemy installed') + +logger = logging.getLogger(__name__) + + +class SQLAlchemyJobStore(JobStore): + def __init__(self, url=None, engine=None, tablename='apscheduler_jobs', + metadata=None, pickle_protocol=pickle.HIGHEST_PROTOCOL): + self.jobs = [] + self.pickle_protocol = pickle_protocol + + if engine: + self.engine = engine + elif url: + self.engine = create_engine(url) + else: + raise ValueError('Need either "engine" or "url" defined') + + if sqlalchemy.__version__ < '0.7': + pickle_coltype = PickleType(pickle_protocol, mutable=False) + else: + pickle_coltype = PickleType(pickle_protocol) + self.jobs_t = Table( + tablename, metadata or MetaData(), + Column('id', Integer, + Sequence(tablename + '_id_seq', optional=True), + primary_key=True), + Column('trigger', pickle_coltype, nullable=False), + Column('func_ref', String(1024), nullable=False), + Column('args', pickle_coltype, nullable=False), + Column('kwargs', pickle_coltype, nullable=False), + Column('name', Unicode(1024)), + Column('misfire_grace_time', Integer, nullable=False), + Column('coalesce', Boolean, nullable=False), + Column('max_runs', Integer), + Column('max_instances', Integer), + Column('next_run_time', DateTime, nullable=False), + Column('runs', BigInteger)) + + self.jobs_t.create(self.engine, True) + + def add_job(self, job): + job_dict = job.__getstate__() + result = self.engine.execute(self.jobs_t.insert().values(**job_dict)) + job.id = result.inserted_primary_key[0] + self.jobs.append(job) + + def remove_job(self, job): + delete = self.jobs_t.delete().where(self.jobs_t.c.id == job.id) + self.engine.execute(delete) + self.jobs.remove(job) + + def load_jobs(self): + jobs = [] + for row in self.engine.execute(select([self.jobs_t])): + try: + job = Job.__new__(Job) + job_dict = dict(row.items()) + job.__setstate__(job_dict) + jobs.append(job) + except Exception: + job_name = job_dict.get('name', '(unknown)') + logger.exception('Unable to restore job "%s"', job_name) + self.jobs = jobs + + def update_job(self, job): + job_dict = job.__getstate__() + update = self.jobs_t.update().where(self.jobs_t.c.id == job.id).\ + values(next_run_time=job_dict['next_run_time'], + runs=job_dict['runs']) + self.engine.execute(update) + + def close(self): + self.engine.dispose() + + def __repr__(self): + return '<%s (url=%s)>' % (self.__class__.__name__, self.engine.url) http://git-wip-us.apache.org/repos/asf/ambari/blob/14e79ed1/ambari-agent/src/main/python/ambari_agent/apscheduler/scheduler.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/scheduler.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/scheduler.py new file mode 100644 index 0000000..319037a --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/scheduler.py @@ -0,0 +1,607 @@ +""" +This module is the main part of the library. It houses the Scheduler class +and related exceptions. +""" + +from threading import Thread, Event, Lock +from datetime import datetime, timedelta +from logging import getLogger +import os +import sys + +from apscheduler.util import * +from apscheduler.triggers import SimpleTrigger, IntervalTrigger, CronTrigger +from apscheduler.jobstores.ram_store import RAMJobStore +from apscheduler.job import Job, MaxInstancesReachedError +from apscheduler.events import * +from apscheduler.threadpool import ThreadPool + +logger = getLogger(__name__) + + +class SchedulerAlreadyRunningError(Exception): + """ + Raised when attempting to start or configure the scheduler when it's + already running. + """ + + def __str__(self): + return 'Scheduler is already running' + + +class Scheduler(object): + """ + This class is responsible for scheduling jobs and triggering + their execution. + """ + + _stopped = True + _thread = None + + def __init__(self, gconfig={}, **options): + self._wakeup = Event() + self._jobstores = {} + self._jobstores_lock = Lock() + self._listeners = [] + self._listeners_lock = Lock() + self._pending_jobs = [] + self.configure(gconfig, **options) + + def configure(self, gconfig={}, **options): + """ + Reconfigures the scheduler with the given options. Can only be done + when the scheduler isn't running. + """ + if self.running: + raise SchedulerAlreadyRunningError + + # Set general options + config = combine_opts(gconfig, 'apscheduler.', options) + self.misfire_grace_time = int(config.pop('misfire_grace_time', 1)) + self.coalesce = asbool(config.pop('coalesce', True)) + self.daemonic = asbool(config.pop('daemonic', True)) + self.standalone = asbool(config.pop('standalone', False)) + + # Configure the thread pool + if 'threadpool' in config: + self._threadpool = maybe_ref(config['threadpool']) + else: + threadpool_opts = combine_opts(config, 'threadpool.') + self._threadpool = ThreadPool(**threadpool_opts) + + # Configure job stores + jobstore_opts = combine_opts(config, 'jobstore.') + jobstores = {} + for key, value in jobstore_opts.items(): + store_name, option = key.split('.', 1) + opts_dict = jobstores.setdefault(store_name, {}) + opts_dict[option] = value + + for alias, opts in jobstores.items(): + classname = opts.pop('class') + cls = maybe_ref(classname) + jobstore = cls(**opts) + self.add_jobstore(jobstore, alias, True) + + def start(self): + """ + Starts the scheduler in a new thread. + + In threaded mode (the default), this method will return immediately + after starting the scheduler thread. + + In standalone mode, this method will block until there are no more + scheduled jobs. + """ + if self.running: + raise SchedulerAlreadyRunningError + + # Create a RAMJobStore as the default if there is no default job store + if not 'default' in self._jobstores: + self.add_jobstore(RAMJobStore(), 'default', True) + + # Schedule all pending jobs + for job, jobstore in self._pending_jobs: + self._real_add_job(job, jobstore, False) + del self._pending_jobs[:] + + self._stopped = False + if self.standalone: + self._main_loop() + else: + self._thread = Thread(target=self._main_loop, name='APScheduler') + self._thread.setDaemon(self.daemonic) + self._thread.start() + + def shutdown(self, wait=True, shutdown_threadpool=True, + close_jobstores=True): + """ + Shuts down the scheduler and terminates the thread. + Does not interrupt any currently running jobs. + + :param wait: ``True`` to wait until all currently executing jobs have + finished (if ``shutdown_threadpool`` is also ``True``) + :param shutdown_threadpool: ``True`` to shut down the thread pool + :param close_jobstores: ``True`` to close all job stores after shutdown + """ + if not self.running: + return + + self._stopped = True + self._wakeup.set() + + # Shut down the thread pool + if shutdown_threadpool: + self._threadpool.shutdown(wait) + + # Wait until the scheduler thread terminates + if self._thread: + self._thread.join() + + # Close all job stores + if close_jobstores: + for jobstore in itervalues(self._jobstores): + jobstore.close() + + @property + def running(self): + thread_alive = self._thread and self._thread.isAlive() + standalone = getattr(self, 'standalone', False) + return not self._stopped and (standalone or thread_alive) + + def add_jobstore(self, jobstore, alias, quiet=False): + """ + Adds a job store to this scheduler. + + :param jobstore: job store to be added + :param alias: alias for the job store + :param quiet: True to suppress scheduler thread wakeup + :type jobstore: instance of + :class:`~apscheduler.jobstores.base.JobStore` + :type alias: str + """ + self._jobstores_lock.acquire() + try: + if alias in self._jobstores: + raise KeyError('Alias "%s" is already in use' % alias) + self._jobstores[alias] = jobstore + jobstore.load_jobs() + finally: + self._jobstores_lock.release() + + # Notify listeners that a new job store has been added + self._notify_listeners(JobStoreEvent(EVENT_JOBSTORE_ADDED, alias)) + + # Notify the scheduler so it can scan the new job store for jobs + if not quiet: + self._wakeup.set() + + def remove_jobstore(self, alias, close=True): + """ + Removes the job store by the given alias from this scheduler. + + :param close: ``True`` to close the job store after removing it + :type alias: str + """ + self._jobstores_lock.acquire() + try: + jobstore = self._jobstores.pop(alias) + if not jobstore: + raise KeyError('No such job store: %s' % alias) + finally: + self._jobstores_lock.release() + + # Close the job store if requested + if close: + jobstore.close() + + # Notify listeners that a job store has been removed + self._notify_listeners(JobStoreEvent(EVENT_JOBSTORE_REMOVED, alias)) + + def add_listener(self, callback, mask=EVENT_ALL): + """ + Adds a listener for scheduler events. When a matching event occurs, + ``callback`` is executed with the event object as its sole argument. + If the ``mask`` parameter is not provided, the callback will receive + events of all types. + + :param callback: any callable that takes one argument + :param mask: bitmask that indicates which events should be listened to + """ + self._listeners_lock.acquire() + try: + self._listeners.append((callback, mask)) + finally: + self._listeners_lock.release() + + def remove_listener(self, callback): + """ + Removes a previously added event listener. + """ + self._listeners_lock.acquire() + try: + for i, (cb, _) in enumerate(self._listeners): + if callback == cb: + del self._listeners[i] + finally: + self._listeners_lock.release() + + def _notify_listeners(self, event): + self._listeners_lock.acquire() + try: + listeners = tuple(self._listeners) + finally: + self._listeners_lock.release() + + for cb, mask in listeners: + if event.code & mask: + try: + cb(event) + except: + logger.exception('Error notifying listener') + + def _real_add_job(self, job, jobstore, wakeup): + job.compute_next_run_time(datetime.now()) + if not job.next_run_time: + raise ValueError('Not adding job since it would never be run') + + self._jobstores_lock.acquire() + try: + try: + store = self._jobstores[jobstore] + except KeyError: + raise KeyError('No such job store: %s' % jobstore) + store.add_job(job) + finally: + self._jobstores_lock.release() + + # Notify listeners that a new job has been added + event = JobStoreEvent(EVENT_JOBSTORE_JOB_ADDED, jobstore, job) + self._notify_listeners(event) + + logger.info('Added job "%s" to job store "%s"', job, jobstore) + + # Notify the scheduler about the new job + if wakeup: + self._wakeup.set() + + def add_job(self, trigger, func, args, kwargs, jobstore='default', + **options): + """ + Adds the given job to the job list and notifies the scheduler thread. + Any extra keyword arguments are passed along to the constructor of the + :class:`~apscheduler.job.Job` class (see :ref:`job_options`). + + :param trigger: trigger that determines when ``func`` is called + :param func: callable to run at the given time + :param args: list of positional arguments to call func with + :param kwargs: dict of keyword arguments to call func with + :param jobstore: alias of the job store to store the job in + :rtype: :class:`~apscheduler.job.Job` + """ + job = Job(trigger, func, args or [], kwargs or {}, + options.pop('misfire_grace_time', self.misfire_grace_time), + options.pop('coalesce', self.coalesce), **options) + if not self.running: + self._pending_jobs.append((job, jobstore)) + logger.info('Adding job tentatively -- it will be properly ' + 'scheduled when the scheduler starts') + else: + self._real_add_job(job, jobstore, True) + return job + + def _remove_job(self, job, alias, jobstore): + jobstore.remove_job(job) + + # Notify listeners that a job has been removed + event = JobStoreEvent(EVENT_JOBSTORE_JOB_REMOVED, alias, job) + self._notify_listeners(event) + + logger.info('Removed job "%s"', job) + + def add_date_job(self, func, date, args=None, kwargs=None, **options): + """ + Schedules a job to be completed on a specific date and time. + Any extra keyword arguments are passed along to the constructor of the + :class:`~apscheduler.job.Job` class (see :ref:`job_options`). + + :param func: callable to run at the given time + :param date: the date/time to run the job at + :param name: name of the job + :param jobstore: stored the job in the named (or given) job store + :param misfire_grace_time: seconds after the designated run time that + the job is still allowed to be run + :type date: :class:`datetime.date` + :rtype: :class:`~apscheduler.job.Job` + """ + trigger = SimpleTrigger(date) + return self.add_job(trigger, func, args, kwargs, **options) + + def add_interval_job(self, func, weeks=0, days=0, hours=0, minutes=0, + seconds=0, start_date=None, args=None, kwargs=None, + **options): + """ + Schedules a job to be completed on specified intervals. + Any extra keyword arguments are passed along to the constructor of the + :class:`~apscheduler.job.Job` class (see :ref:`job_options`). + + :param func: callable to run + :param weeks: number of weeks to wait + :param days: number of days to wait + :param hours: number of hours to wait + :param minutes: number of minutes to wait + :param seconds: number of seconds to wait + :param start_date: when to first execute the job and start the + counter (default is after the given interval) + :param args: list of positional arguments to call func with + :param kwargs: dict of keyword arguments to call func with + :param name: name of the job + :param jobstore: alias of the job store to add the job to + :param misfire_grace_time: seconds after the designated run time that + the job is still allowed to be run + :rtype: :class:`~apscheduler.job.Job` + """ + interval = timedelta(weeks=weeks, days=days, hours=hours, + minutes=minutes, seconds=seconds) + trigger = IntervalTrigger(interval, start_date) + return self.add_job(trigger, func, args, kwargs, **options) + + def add_cron_job(self, func, year=None, month=None, day=None, week=None, + day_of_week=None, hour=None, minute=None, second=None, + start_date=None, args=None, kwargs=None, **options): + """ + Schedules a job to be completed on times that match the given + expressions. + Any extra keyword arguments are passed along to the constructor of the + :class:`~apscheduler.job.Job` class (see :ref:`job_options`). + + :param func: callable to run + :param year: year to run on + :param month: month to run on + :param day: day of month to run on + :param week: week of the year to run on + :param day_of_week: weekday to run on (0 = Monday) + :param hour: hour to run on + :param second: second to run on + :param args: list of positional arguments to call func with + :param kwargs: dict of keyword arguments to call func with + :param name: name of the job + :param jobstore: alias of the job store to add the job to + :param misfire_grace_time: seconds after the designated run time that + the job is still allowed to be run + :return: the scheduled job + :rtype: :class:`~apscheduler.job.Job` + """ + trigger = CronTrigger(year=year, month=month, day=day, week=week, + day_of_week=day_of_week, hour=hour, + minute=minute, second=second, + start_date=start_date) + return self.add_job(trigger, func, args, kwargs, **options) + + def cron_schedule(self, **options): + """ + Decorator version of :meth:`add_cron_job`. + This decorator does not wrap its host function. + Unscheduling decorated functions is possible by passing the ``job`` + attribute of the scheduled function to :meth:`unschedule_job`. + Any extra keyword arguments are passed along to the constructor of the + :class:`~apscheduler.job.Job` class (see :ref:`job_options`). + """ + def inner(func): + func.job = self.add_cron_job(func, **options) + return func + return inner + + def interval_schedule(self, **options): + """ + Decorator version of :meth:`add_interval_job`. + This decorator does not wrap its host function. + Unscheduling decorated functions is possible by passing the ``job`` + attribute of the scheduled function to :meth:`unschedule_job`. + Any extra keyword arguments are passed along to the constructor of the + :class:`~apscheduler.job.Job` class (see :ref:`job_options`). + """ + def inner(func): + func.job = self.add_interval_job(func, **options) + return func + return inner + + def get_jobs(self): + """ + Returns a list of all scheduled jobs. + + :return: list of :class:`~apscheduler.job.Job` objects + """ + self._jobstores_lock.acquire() + try: + jobs = [] + for jobstore in itervalues(self._jobstores): + jobs.extend(jobstore.jobs) + return jobs + finally: + self._jobstores_lock.release() + + def unschedule_job(self, job): + """ + Removes a job, preventing it from being run any more. + """ + self._jobstores_lock.acquire() + try: + for alias, jobstore in iteritems(self._jobstores): + if job in list(jobstore.jobs): + self._remove_job(job, alias, jobstore) + return + finally: + self._jobstores_lock.release() + + raise KeyError('Job "%s" is not scheduled in any job store' % job) + + def unschedule_func(self, func): + """ + Removes all jobs that would execute the given function. + """ + found = False + self._jobstores_lock.acquire() + try: + for alias, jobstore in iteritems(self._jobstores): + for job in list(jobstore.jobs): + if job.func == func: + self._remove_job(job, alias, jobstore) + found = True + finally: + self._jobstores_lock.release() + + if not found: + raise KeyError('The given function is not scheduled in this ' + 'scheduler') + + def print_jobs(self, out=None): + """ + Prints out a textual listing of all jobs currently scheduled on this + scheduler. + + :param out: a file-like object to print to (defaults to **sys.stdout** + if nothing is given) + """ + out = out or sys.stdout + job_strs = [] + self._jobstores_lock.acquire() + try: + for alias, jobstore in iteritems(self._jobstores): + job_strs.append('Jobstore %s:' % alias) + if jobstore.jobs: + for job in jobstore.jobs: + job_strs.append(' %s' % job) + else: + job_strs.append(' No scheduled jobs') + finally: + self._jobstores_lock.release() + + out.write(os.linesep.join(job_strs) + os.linesep) + + def _run_job(self, job, run_times): + """ + Acts as a harness that runs the actual job code in a thread. + """ + for run_time in run_times: + # See if the job missed its run time window, and handle possible + # misfires accordingly + difference = datetime.now() - run_time + grace_time = timedelta(seconds=job.misfire_grace_time) + if difference > grace_time: + # Notify listeners about a missed run + event = JobEvent(EVENT_JOB_MISSED, job, run_time) + self._notify_listeners(event) + logger.warning('Run time of job "%s" was missed by %s', + job, difference) + else: + try: + job.add_instance() + except MaxInstancesReachedError: + event = JobEvent(EVENT_JOB_MISSED, job, run_time) + self._notify_listeners(event) + logger.warning('Execution of job "%s" skipped: ' + 'maximum number of running instances ' + 'reached (%d)', job, job.max_instances) + break + + logger.info('Running job "%s" (scheduled at %s)', job, + run_time) + + try: + retval = job.func(*job.args, **job.kwargs) + except: + # Notify listeners about the exception + exc, tb = sys.exc_info()[1:] + event = JobEvent(EVENT_JOB_ERROR, job, run_time, + exception=exc, traceback=tb) + self._notify_listeners(event) + + logger.exception('Job "%s" raised an exception', job) + else: + # Notify listeners about successful execution + event = JobEvent(EVENT_JOB_EXECUTED, job, run_time, + retval=retval) + self._notify_listeners(event) + + logger.info('Job "%s" executed successfully', job) + + job.remove_instance() + + # If coalescing is enabled, don't attempt any further runs + if job.coalesce: + break + + def _process_jobs(self, now): + """ + Iterates through jobs in every jobstore, starts pending jobs + and figures out the next wakeup time. + """ + next_wakeup_time = None + self._jobstores_lock.acquire() + try: + for alias, jobstore in iteritems(self._jobstores): + for job in tuple(jobstore.jobs): + run_times = job.get_run_times(now) + if run_times: + self._threadpool.submit(self._run_job, job, run_times) + + # Increase the job's run count + if job.coalesce: + job.runs += 1 + else: + job.runs += len(run_times) + + # Update the job, but don't keep finished jobs around + if job.compute_next_run_time( + now + timedelta(microseconds=1)): + jobstore.update_job(job) + else: + self._remove_job(job, alias, jobstore) + + if not next_wakeup_time: + next_wakeup_time = job.next_run_time + elif job.next_run_time: + next_wakeup_time = min(next_wakeup_time, + job.next_run_time) + return next_wakeup_time + finally: + self._jobstores_lock.release() + + def _main_loop(self): + """Executes jobs on schedule.""" + + logger.info('Scheduler started') + self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_START)) + + self._wakeup.clear() + while not self._stopped: + logger.debug('Looking for jobs to run') + now = datetime.now() + next_wakeup_time = self._process_jobs(now) + + # Sleep until the next job is scheduled to be run, + # a new job is added or the scheduler is stopped + if next_wakeup_time is not None: + wait_seconds = time_difference(next_wakeup_time, now) + logger.debug('Next wakeup is due at %s (in %f seconds)', + next_wakeup_time, wait_seconds) + try: + self._wakeup.wait(wait_seconds) + except IOError: # Catch errno 514 on some Linux kernels + pass + self._wakeup.clear() + elif self.standalone: + logger.debug('No jobs left; shutting down scheduler') + self.shutdown() + break + else: + logger.debug('No jobs; waiting until a job is added') + try: + self._wakeup.wait() + except IOError: # Catch errno 514 on some Linux kernels + pass + self._wakeup.clear() + + logger.info('Scheduler has been shut down') + self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_SHUTDOWN)) http://git-wip-us.apache.org/repos/asf/ambari/blob/14e79ed1/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py new file mode 100644 index 0000000..8ec47da --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py @@ -0,0 +1,133 @@ +""" +Generic thread pool class. Modeled after Java's ThreadPoolExecutor. +Please note that this ThreadPool does *not* fully implement the PEP 3148 +ThreadPool! +""" + +from threading import Thread, Lock, currentThread +from weakref import ref +import logging +import atexit + +try: + from queue import Queue, Empty +except ImportError: + from Queue import Queue, Empty + +logger = logging.getLogger(__name__) +_threadpools = set() + + +# Worker threads are daemonic in order to let the interpreter exit without +# an explicit shutdown of the thread pool. The following trick is necessary +# to allow worker threads to finish cleanly. +def _shutdown_all(): + for pool_ref in tuple(_threadpools): + pool = pool_ref() + if pool: + pool.shutdown() + +atexit.register(_shutdown_all) + + +class ThreadPool(object): + def __init__(self, core_threads=0, max_threads=20, keepalive=1): + """ + :param core_threads: maximum number of persistent threads in the pool + :param max_threads: maximum number of total threads in the pool + :param thread_class: callable that creates a Thread object + :param keepalive: seconds to keep non-core worker threads waiting + for new tasks + """ + self.core_threads = core_threads + self.max_threads = max(max_threads, core_threads, 1) + self.keepalive = keepalive + self._queue = Queue() + self._threads_lock = Lock() + self._threads = set() + self._shutdown = False + + _threadpools.add(ref(self)) + logger.info('Started thread pool with %d core threads and %s maximum ' + 'threads', core_threads, max_threads or 'unlimited') + + def _adjust_threadcount(self): + self._threads_lock.acquire() + try: + if self.num_threads < self.max_threads: + self._add_thread(self.num_threads < self.core_threads) + finally: + self._threads_lock.release() + + def _add_thread(self, core): + t = Thread(target=self._run_jobs, args=(core,)) + t.setDaemon(True) + t.start() + self._threads.add(t) + + def _run_jobs(self, core): + logger.debug('Started worker thread') + block = True + timeout = None + if not core: + block = self.keepalive > 0 + timeout = self.keepalive + + while True: + try: + func, args, kwargs = self._queue.get(block, timeout) + except Empty: + break + + if self._shutdown: + break + + try: + func(*args, **kwargs) + except: + logger.exception('Error in worker thread') + + self._threads_lock.acquire() + self._threads.remove(currentThread()) + self._threads_lock.release() + + logger.debug('Exiting worker thread') + + @property + def num_threads(self): + return len(self._threads) + + def submit(self, func, *args, **kwargs): + if self._shutdown: + raise RuntimeError('Cannot schedule new tasks after shutdown') + + self._queue.put((func, args, kwargs)) + self._adjust_threadcount() + + def shutdown(self, wait=True): + if self._shutdown: + return + + logging.info('Shutting down thread pool') + self._shutdown = True + _threadpools.remove(ref(self)) + + self._threads_lock.acquire() + for _ in range(self.num_threads): + self._queue.put((None, None, None)) + self._threads_lock.release() + + if wait: + self._threads_lock.acquire() + threads = tuple(self._threads) + self._threads_lock.release() + for thread in threads: + thread.join() + + def __repr__(self): + if self.max_threads: + threadcount = '%d/%d' % (self.num_threads, self.max_threads) + else: + threadcount = '%d' % self.num_threads + + return '<ThreadPool at %x; threads=%s>' % (id(self), threadcount) http://git-wip-us.apache.org/repos/asf/ambari/blob/14e79ed1/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/__init__.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/__init__.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/__init__.py new file mode 100644 index 0000000..74a9788 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/__init__.py @@ -0,0 +1,3 @@ +from apscheduler.triggers.cron import CronTrigger +from apscheduler.triggers.interval import IntervalTrigger +from apscheduler.triggers.simple import SimpleTrigger http://git-wip-us.apache.org/repos/asf/ambari/blob/14e79ed1/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/__init__.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/__init__.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/__init__.py new file mode 100644 index 0000000..9e69f72 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/__init__.py @@ -0,0 +1,144 @@ +from datetime import date, datetime + +from apscheduler.triggers.cron.fields import * +from apscheduler.util import datetime_ceil, convert_to_datetime, iteritems + + +class CronTrigger(object): + FIELD_NAMES = ('year', 'month', 'day', 'week', 'day_of_week', 'hour', + 'minute', 'second') + FIELDS_MAP = {'year': BaseField, + 'month': BaseField, + 'week': WeekField, + 'day': DayOfMonthField, + 'day_of_week': DayOfWeekField, + 'hour': BaseField, + 'minute': BaseField, + 'second': BaseField} + + def __init__(self, **values): + self.start_date = values.pop('start_date', None) + if self.start_date: + self.start_date = convert_to_datetime(self.start_date) + + # Check field names and yank out all None valued fields + for key, value in list(iteritems(values)): + if key not in self.FIELD_NAMES: + raise TypeError('Invalid field name: %s' % key) + if value is None: + del values[key] + + self.fields = [] + assign_defaults = False + for field_name in self.FIELD_NAMES: + if field_name in values: + exprs = values.pop(field_name) + is_default = False + assign_defaults = not values + elif assign_defaults: + exprs = DEFAULT_VALUES[field_name] + is_default = True + else: + exprs = '*' + is_default = True + + field_class = self.FIELDS_MAP[field_name] + field = field_class(field_name, exprs, is_default) + self.fields.append(field) + + def _increment_field_value(self, dateval, fieldnum): + """ + Increments the designated field and resets all less significant fields + to their minimum values. + + :type dateval: datetime + :type fieldnum: int + :type amount: int + :rtype: tuple + :return: a tuple containing the new date, and the number of the field + that was actually incremented + """ + i = 0 + values = {} + while i < len(self.fields): + field = self.fields[i] + if not field.REAL: + if i == fieldnum: + fieldnum -= 1 + i -= 1 + else: + i += 1 + continue + + if i < fieldnum: + values[field.name] = field.get_value(dateval) + i += 1 + elif i > fieldnum: + values[field.name] = field.get_min(dateval) + i += 1 + else: + value = field.get_value(dateval) + maxval = field.get_max(dateval) + if value == maxval: + fieldnum -= 1 + i -= 1 + else: + values[field.name] = value + 1 + i += 1 + + return datetime(**values), fieldnum + + def _set_field_value(self, dateval, fieldnum, new_value): + values = {} + for i, field in enumerate(self.fields): + if field.REAL: + if i < fieldnum: + values[field.name] = field.get_value(dateval) + elif i > fieldnum: + values[field.name] = field.get_min(dateval) + else: + values[field.name] = new_value + + return datetime(**values) + + def get_next_fire_time(self, start_date): + if self.start_date: + start_date = max(start_date, self.start_date) + next_date = datetime_ceil(start_date) + fieldnum = 0 + while 0 <= fieldnum < len(self.fields): + field = self.fields[fieldnum] + curr_value = field.get_value(next_date) + next_value = field.get_next_value(next_date) + + if next_value is None: + # No valid value was found + next_date, fieldnum = self._increment_field_value( + next_date, fieldnum - 1) + elif next_value > curr_value: + # A valid, but higher than the starting value, was found + if field.REAL: + next_date = self._set_field_value( + next_date, fieldnum, next_value) + fieldnum += 1 + else: + next_date, fieldnum = self._increment_field_value( + next_date, fieldnum) + else: + # A valid value was found, no changes necessary + fieldnum += 1 + + if fieldnum >= 0: + return next_date + + def __str__(self): + options = ["%s='%s'" % (f.name, str(f)) for f in self.fields + if not f.is_default] + return 'cron[%s]' % (', '.join(options)) + + def __repr__(self): + options = ["%s='%s'" % (f.name, str(f)) for f in self.fields + if not f.is_default] + if self.start_date: + options.append("start_date='%s'" % self.start_date.isoformat(' ')) + return '<%s (%s)>' % (self.__class__.__name__, ', '.join(options)) http://git-wip-us.apache.org/repos/asf/ambari/blob/14e79ed1/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/expressions.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/expressions.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/expressions.py new file mode 100644 index 0000000..b5d2919 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/expressions.py @@ -0,0 +1,194 @@ +""" +This module contains the expressions applicable for CronTrigger's fields. +""" + +from calendar import monthrange +import re + +from apscheduler.util import asint + +__all__ = ('AllExpression', 'RangeExpression', 'WeekdayRangeExpression', + 'WeekdayPositionExpression', 'LastDayOfMonthExpression') + + +WEEKDAYS = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun'] + + +class AllExpression(object): + value_re = re.compile(r'\*(?:/(?P<step>\d+))?$') + + def __init__(self, step=None): + self.step = asint(step) + if self.step == 0: + raise ValueError('Increment must be higher than 0') + + def get_next_value(self, date, field): + start = field.get_value(date) + minval = field.get_min(date) + maxval = field.get_max(date) + start = max(start, minval) + + if not self.step: + next = start + else: + distance_to_next = (self.step - (start - minval)) % self.step + next = start + distance_to_next + + if next <= maxval: + return next + + def __str__(self): + if self.step: + return '*/%d' % self.step + return '*' + + def __repr__(self): + return "%s(%s)" % (self.__class__.__name__, self.step) + + +class RangeExpression(AllExpression): + value_re = re.compile( + r'(?P<first>\d+)(?:-(?P<last>\d+))?(?:/(?P<step>\d+))?$') + + def __init__(self, first, last=None, step=None): + AllExpression.__init__(self, step) + first = asint(first) + last = asint(last) + if last is None and step is None: + last = first + if last is not None and first > last: + raise ValueError('The minimum value in a range must not be ' + 'higher than the maximum') + self.first = first + self.last = last + + def get_next_value(self, date, field): + start = field.get_value(date) + minval = field.get_min(date) + maxval = field.get_max(date) + + # Apply range limits + minval = max(minval, self.first) + if self.last is not None: + maxval = min(maxval, self.last) + start = max(start, minval) + + if not self.step: + next = start + else: + distance_to_next = (self.step - (start - minval)) % self.step + next = start + distance_to_next + + if next <= maxval: + return next + + def __str__(self): + if self.last != self.first and self.last is not None: + range = '%d-%d' % (self.first, self.last) + else: + range = str(self.first) + + if self.step: + return '%s/%d' % (range, self.step) + return range + + def __repr__(self): + args = [str(self.first)] + if self.last != self.first and self.last is not None or self.step: + args.append(str(self.last)) + if self.step: + args.append(str(self.step)) + return "%s(%s)" % (self.__class__.__name__, ', '.join(args)) + + +class WeekdayRangeExpression(RangeExpression): + value_re = re.compile(r'(?P<first>[a-z]+)(?:-(?P<last>[a-z]+))?', + re.IGNORECASE) + + def __init__(self, first, last=None): + try: + first_num = WEEKDAYS.index(first.lower()) + except ValueError: + raise ValueError('Invalid weekday name "%s"' % first) + + if last: + try: + last_num = WEEKDAYS.index(last.lower()) + except ValueError: + raise ValueError('Invalid weekday name "%s"' % last) + else: + last_num = None + + RangeExpression.__init__(self, first_num, last_num) + + def __str__(self): + if self.last != self.first and self.last is not None: + return '%s-%s' % (WEEKDAYS[self.first], WEEKDAYS[self.last]) + return WEEKDAYS[self.first] + + def __repr__(self): + args = ["'%s'" % WEEKDAYS[self.first]] + if self.last != self.first and self.last is not None: + args.append("'%s'" % WEEKDAYS[self.last]) + return "%s(%s)" % (self.__class__.__name__, ', '.join(args)) + + +class WeekdayPositionExpression(AllExpression): + options = ['1st', '2nd', '3rd', '4th', '5th', 'last'] + value_re = re.compile(r'(?P<option_name>%s) +(?P<weekday_name>(?:\d+|\w+))' + % '|'.join(options), re.IGNORECASE) + + def __init__(self, option_name, weekday_name): + try: + self.option_num = self.options.index(option_name.lower()) + except ValueError: + raise ValueError('Invalid weekday position "%s"' % option_name) + + try: + self.weekday = WEEKDAYS.index(weekday_name.lower()) + except ValueError: + raise ValueError('Invalid weekday name "%s"' % weekday_name) + + def get_next_value(self, date, field): + # Figure out the weekday of the month's first day and the number + # of days in that month + first_day_wday, last_day = monthrange(date.year, date.month) + + # Calculate which day of the month is the first of the target weekdays + first_hit_day = self.weekday - first_day_wday + 1 + if first_hit_day <= 0: + first_hit_day += 7 + + # Calculate what day of the month the target weekday would be + if self.option_num < 5: + target_day = first_hit_day + self.option_num * 7 + else: + target_day = first_hit_day + ((last_day - first_hit_day) / 7) * 7 + + if target_day <= last_day and target_day >= date.day: + return target_day + + def __str__(self): + return '%s %s' % (self.options[self.option_num], + WEEKDAYS[self.weekday]) + + def __repr__(self): + return "%s('%s', '%s')" % (self.__class__.__name__, + self.options[self.option_num], + WEEKDAYS[self.weekday]) + + +class LastDayOfMonthExpression(AllExpression): + value_re = re.compile(r'last', re.IGNORECASE) + + def __init__(self): + pass + + def get_next_value(self, date, field): + return monthrange(date.year, date.month)[1] + + def __str__(self): + return 'last' + + def __repr__(self): + return "%s()" % self.__class__.__name__ http://git-wip-us.apache.org/repos/asf/ambari/blob/14e79ed1/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/fields.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/fields.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/fields.py new file mode 100644 index 0000000..be5e5e3 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/cron/fields.py @@ -0,0 +1,100 @@ +""" +Fields represent CronTrigger options which map to :class:`~datetime.datetime` +fields. +""" + +from calendar import monthrange + +from apscheduler.triggers.cron.expressions import * + +__all__ = ('MIN_VALUES', 'MAX_VALUES', 'DEFAULT_VALUES', 'BaseField', + 'WeekField', 'DayOfMonthField', 'DayOfWeekField') + + +MIN_VALUES = {'year': 1970, 'month': 1, 'day': 1, 'week': 1, + 'day_of_week': 0, 'hour': 0, 'minute': 0, 'second': 0} +MAX_VALUES = {'year': 2 ** 63, 'month': 12, 'day:': 31, 'week': 53, + 'day_of_week': 6, 'hour': 23, 'minute': 59, 'second': 59} +DEFAULT_VALUES = {'year': '*', 'month': 1, 'day': 1, 'week': '*', + 'day_of_week': '*', 'hour': 0, 'minute': 0, 'second': 0} + + +class BaseField(object): + REAL = True + COMPILERS = [AllExpression, RangeExpression] + + def __init__(self, name, exprs, is_default=False): + self.name = name + self.is_default = is_default + self.compile_expressions(exprs) + + def get_min(self, dateval): + return MIN_VALUES[self.name] + + def get_max(self, dateval): + return MAX_VALUES[self.name] + + def get_value(self, dateval): + return getattr(dateval, self.name) + + def get_next_value(self, dateval): + smallest = None + for expr in self.expressions: + value = expr.get_next_value(dateval, self) + if smallest is None or (value is not None and value < smallest): + smallest = value + + return smallest + + def compile_expressions(self, exprs): + self.expressions = [] + + # Split a comma-separated expression list, if any + exprs = str(exprs).strip() + if ',' in exprs: + for expr in exprs.split(','): + self.compile_expression(expr) + else: + self.compile_expression(exprs) + + def compile_expression(self, expr): + for compiler in self.COMPILERS: + match = compiler.value_re.match(expr) + if match: + compiled_expr = compiler(**match.groupdict()) + self.expressions.append(compiled_expr) + return + + raise ValueError('Unrecognized expression "%s" for field "%s"' % + (expr, self.name)) + + def __str__(self): + expr_strings = (str(e) for e in self.expressions) + return ','.join(expr_strings) + + def __repr__(self): + return "%s('%s', '%s')" % (self.__class__.__name__, self.name, + str(self)) + + +class WeekField(BaseField): + REAL = False + + def get_value(self, dateval): + return dateval.isocalendar()[1] + + +class DayOfMonthField(BaseField): + COMPILERS = BaseField.COMPILERS + [WeekdayPositionExpression, + LastDayOfMonthExpression] + + def get_max(self, dateval): + return monthrange(dateval.year, dateval.month)[1] + + +class DayOfWeekField(BaseField): + REAL = False + COMPILERS = BaseField.COMPILERS + [WeekdayRangeExpression] + + def get_value(self, dateval): + return dateval.weekday() http://git-wip-us.apache.org/repos/asf/ambari/blob/14e79ed1/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/interval.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/interval.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/interval.py new file mode 100644 index 0000000..dd16d77 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/triggers/interval.py @@ -0,0 +1,39 @@ +from datetime import datetime, timedelta +from math import ceil + +from apscheduler.util import convert_to_datetime, timedelta_seconds + + +class IntervalTrigger(object): + def __init__(self, interval, start_date=None): + if not isinstance(interval, timedelta): + raise TypeError('interval must be a timedelta') + if start_date: + start_date = convert_to_datetime(start_date) + + self.interval = interval + self.interval_length = timedelta_seconds(self.interval) + if self.interval_length == 0: + self.interval = timedelta(seconds=1) + self.interval_length = 1 + + if start_date is None: + self.start_date = datetime.now() + self.interval + else: + self.start_date = convert_to_datetime(start_date) + + def get_next_fire_time(self, start_date): + if start_date < self.start_date: + return self.start_date + + timediff_seconds = timedelta_seconds(start_date - self.start_date) + next_interval_num = int(ceil(timediff_seconds / self.interval_length)) + return self.start_date + self.interval * next_interval_num + + def __str__(self): + return 'interval[%s]' % str(self.interval) + + def __repr__(self): + return "<%s (interval=%s, start_date=%s)>" % ( + self.__class__.__name__, repr(self.interval), + repr(self.start_date))