Erosen has submitted this change and it was merged.
Change subject: adds non-functional job serialization framework
......................................................................
adds non-functional job serialization framework
Change-Id: I7fccca2853fe4376ef503a66f256da4768664b85
---
M wikimetrics/config/celery_config.py
M wikimetrics/controllers/demo.py
M wikimetrics/models/__init__.py
M wikimetrics/models/job.py
A wikimetrics/models/job_response.py
M wikimetrics/models/metric_job.py
M wikimetrics/run.py
7 files changed, 138 insertions(+), 25 deletions(-)
Approvals:
Erosen: Verified; Looks good to me, approved
diff --git a/wikimetrics/config/celery_config.py
b/wikimetrics/config/celery_config.py
index e767b44..578be36 100644
--- a/wikimetrics/config/celery_config.py
+++ b/wikimetrics/config/celery_config.py
@@ -3,4 +3,5 @@
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_TASK_RESULT_EXPIRES = 3600
CELERY_DISABLE_RATE_LIMITS = True
+CELERY_STORE_ERRORS_EVEN_IF_IGNORED = True
DEBUG = True
diff --git a/wikimetrics/controllers/demo.py b/wikimetrics/controllers/demo.py
index f472e10..d18a5a3 100644
--- a/wikimetrics/controllers/demo.py
+++ b/wikimetrics/controllers/demo.py
@@ -28,6 +28,7 @@
user_ids = db_session.query(WikiUser.mediawiki_userid).all()
job = MetricJob(RandomMetric(), user_ids, 'enwiki')
+ #from nose.tools import set_trace; set_trace()
res = job.run.delay(job).get()
print user_ids
return str(res)
diff --git a/wikimetrics/models/__init__.py b/wikimetrics/models/__init__.py
index 1820c15..820ec10 100644
--- a/wikimetrics/models/__init__.py
+++ b/wikimetrics/models/__init__.py
@@ -5,6 +5,7 @@
from concat_metrics_job import *
from job import *
from metric_job import *
+from job_response import *
from multi_project_metric_job import *
from user import *
from wikiuser import *
diff --git a/wikimetrics/models/job.py b/wikimetrics/models/job.py
index d75550e..24f60c3 100644
--- a/wikimetrics/models/job.py
+++ b/wikimetrics/models/job.py
@@ -1,6 +1,12 @@
import collections
+import pickle
from sqlalchemy import Column, Integer, String, ForeignKey
+from sqlalchemy.orm import relationship, backref
from celery import group, chord
+from celery.utils.log import get_task_logger
+from celery import current_task
+import traceback
+import logging
from wikimetrics.configurables import db, queue
@@ -27,10 +33,53 @@
"""
+
+task_logger = get_task_logger(__name__)
+sh = logging.StreamHandler()
+task_logger.addHandler(sh)
+
+
class JobStatus(object):
CREATED = 'CREATED'
STARTED = 'STARTED'
FINISHED = 'FINISHED'
+
+
+class Dammy(db.WikimetricsBase):
+ __tablename__ = 'dummy'
+ id = Column(Integer, primary_key=True)
+ def __init__(self, job):
+ self.job = job
+ @queue.task
+ def rando_taks(self):
+ pass
+
+# this method needs to be at module level for pickling purposes
+def from_db(job_id):
+ """
+ All `Job` subclasses should implement this to ensure that they
+ can be resumed from the database
+
+ Parameters:
+ job_id : primary key in the job table which can be used to
+ locate the serialized information with which a new job
+ can be created
+
+ Returns:
+ a new instance of the Job() class which can be re-run
+ """
+ print 'calling from_db!'
+ session = db.get_session()
+ job = session.query(Job).get(job_id)
+ # note the cast to str is necessary because by default
+ # sqlalchemy returns unicode, which changes the byte representation
+ state_dict = pickle.loads(str(job.state))
+ job.__dict__.update(state_dict)
+ #return job
+ #return Dammy(job)
+ #return Dammy(None)
+ return job.__dict__
+ #return {}
class Job(db.WikimetricsBase):
@@ -46,33 +95,50 @@
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('user.id'))
classpath = Column(String(200))
+ state = Column(String(1000))
status = Column(String(100), default=JobStatus.CREATED)
result_id = Column(String(50))
+ #parent_id = Column(Integer, ForeignKey('job.id'))
+ #children = relationship('Job')
+ #backref=backref("parent", remote_side='job.id'))
# FIXME: calling ConcatMetricsJob().run uses this run instead of the
JobNode one
#@queue.task
#def run(self):
#pass
- @classmethod
- def from_db(cls, job_id):
+ def get_state_dict(self):
"""
- All `Job` subclasses should implement this to ensure that they
- can be resumed from the database
-
- Parameters:
- cls : from_db is a class_method so it requires a class
- istance as it's first arg, so that it can be called
- with JobSubclass.form_db(job_id)
- job_id : primary key in the job table which can be used to
- locate the serialized information with which a new job
- can be created
-
- Returns:
- a new instance of the Job() class which can be re-run
+ All `Job` subclasses should implement this to ensure that any non-
+ database-mapped attributes are saved and reloaded during pickle
+ serialization. The workflow goes as follows:
+
+ 1) upon calling job.delay(), the job.__reduce__() function is
called
+ 2) job.__reduce__() calls job.save()
+ 3) job.save() calls job.get_state_dict()
+ 4) job.save() stores state_dict to db-mapped attribute job.state
+ 5) job.__reduce__() returns job.from_db() and job.id
+ as the unpickling function and state, respectively
+ 6) job.from_db() is called on unpickling end
+ 7) job.from_db() gets new job object from db using job_id
+ 8) job.from_db() updates job object using db.state with:
+ job.__dict__.update(str(pickle.loads(job.state)))
+ 9) job.from_db() returns new job to be used in celery context
"""
- pass
-
+ return {}
+
+ def save(self):
+
+ state_dict = self.get_state_dict()
+ self.state = pickle.dumps(state_dict)
+ session = db.get_session()
+ session.add(self)
+ session.commit()
+
+ def __reduce__(self):
+ self.save()
+ return(from_db, (self.id,))
+
def __repr__(self):
return '<Job("{0}")>'.format(self.id)
@@ -91,10 +157,16 @@
def child_tasks(self):
return group(child.run.s(child) for child in self.children)
- @queue.task
+ @queue.task(serializer='pickle')
def run(self):
- children_then_finish = chord(self.child_tasks())(self.finish.s())
- children_then_finish.get()
+ with open('celery_log_{0}.log'.format(current_task.request.id), 'w')
as fout:
+ fout.write('trace_back at run()r:\n%s' % traceback.format_stack())
+ try:
+ children_then_finish =
chord(self.child_tasks())(self.finish.s())
+ children_then_finish.get()
+ except:
+ task_logger.exception('caught exception within worker:')
+ fout.write('caught exception within
worker:\n{0}'.format(traceback.format_exc()))
@queue.task
def finish(self):
diff --git a/wikimetrics/models/job_response.py
b/wikimetrics/models/job_response.py
new file mode 100644
index 0000000..d8c46ba
--- /dev/null
+++ b/wikimetrics/models/job_response.py
@@ -0,0 +1,38 @@
+import pickle
+from wikimetrics.configurables import queue, db
+import job
+from .multi_project_metric_job import MultiProjectMetricJob
+
+__all__ = [
+ 'JobResponse',
+]
+
+
+class JobResponse(job.JobNode):
+ """
+ Represents a batch of cohort-metric jobs created by the
+ user during a single jobs/create/ workflow. This is also
+ intended to be the unit of work which could be easily re-run.
+ """
+
+
+ def __init__(self, cohort_metrics, user_id):
+ """
+ Parameters:
+
+ cohort_metrics [(Cohort, Metric),...] : list of cohort-metric
pairs to be run
+ """
+ super(JobResponse, self).__init__(user_id=user_id)
+ self.children = [MultiProjectMetricJob(c, m) for c, m in
cohort_metrics]
+
+ def get_state_dict(self):
+ """
+ place any non-sqlalchemy attributes which need to be saved in this
+ dict, and they will be pickled by sav() and unplicked by from_db(),
+ using self.__dict__.update(pickle.loads(self.state))
+ """
+ return {'test_attr' : None}
+
+ @queue.task
+ def finish(job_results):
+ return job_results
diff --git a/wikimetrics/models/metric_job.py b/wikimetrics/models/metric_job.py
index 02621be..e145a99 100644
--- a/wikimetrics/models/metric_job.py
+++ b/wikimetrics/models/metric_job.py
@@ -1,5 +1,5 @@
import job
-from wikimetrics.configurables import queue, db, app
+from wikimetrics.configurables import queue, db
__all__ = [
'MetricJob',
@@ -14,6 +14,7 @@
"""
def __init__(self, metric, user_ids, project):
+ super(MetricJob, self).__init__()
self.metric = metric
self.user_ids = user_ids
self.project = project
@@ -21,5 +22,4 @@
@queue.task
def run(self):
session = db.get_mw_session(self.project)
- with app.test_request_context():
- return self.metric(self.user_ids, session)
+ return self.metric(self.user_ids, session)
diff --git a/wikimetrics/run.py b/wikimetrics/run.py
index ae62c15..5e47ada 100644
--- a/wikimetrics/run.py
+++ b/wikimetrics/run.py
@@ -40,7 +40,7 @@
dest='override_config',
)
parser.add_argument(
- 'mode',
+ '--mode',
nargs='?',
default='import',
choices=[
@@ -83,7 +83,7 @@
# wikimetrics.configurables
##################################
parser = setup_parser()
-args = parser.parse_args()
+args, others = parser.parse_known_args()
logger.info('running with arguments:\n%s', pprint.pformat(vars(args)))
# runs the appropriate config function (web, celery, test)
--
To view, visit https://gerrit.wikimedia.org/r/72624
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I7fccca2853fe4376ef503a66f256da4768664b85
Gerrit-PatchSet: 1
Gerrit-Project: analytics/wikimetrics
Gerrit-Branch: master
Gerrit-Owner: Erosen <[email protected]>
Gerrit-Reviewer: Erosen <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits