Erosen has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/72624


Change subject: adds non-functional job serialization framework
......................................................................

adds non-functional job serialization framework

Change-Id: I7fccca2853fe4376ef503a66f256da4768664b85
---
M wikimetrics/config/celery_config.py
M wikimetrics/controllers/demo.py
M wikimetrics/models/__init__.py
M wikimetrics/models/job.py
A wikimetrics/models/job_response.py
M wikimetrics/models/metric_job.py
M wikimetrics/run.py
7 files changed, 138 insertions(+), 25 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/wikimetrics 
refs/changes/24/72624/1

diff --git a/wikimetrics/config/celery_config.py 
b/wikimetrics/config/celery_config.py
index e767b44..578be36 100644
--- a/wikimetrics/config/celery_config.py
+++ b/wikimetrics/config/celery_config.py
@@ -3,4 +3,5 @@
 CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
 CELERY_TASK_RESULT_EXPIRES = 3600
 CELERY_DISABLE_RATE_LIMITS = True
+CELERY_STORE_ERRORS_EVEN_IF_IGNORED = True
 DEBUG = True
diff --git a/wikimetrics/controllers/demo.py b/wikimetrics/controllers/demo.py
index f472e10..d18a5a3 100644
--- a/wikimetrics/controllers/demo.py
+++ b/wikimetrics/controllers/demo.py
@@ -28,6 +28,7 @@
             user_ids = db_session.query(WikiUser.mediawiki_userid).all()
         
         job = MetricJob(RandomMetric(), user_ids, 'enwiki')
+        #from nose.tools import set_trace; set_trace()
         res = job.run.delay(job).get()
         print user_ids
         return str(res)
diff --git a/wikimetrics/models/__init__.py b/wikimetrics/models/__init__.py
index 1820c15..820ec10 100644
--- a/wikimetrics/models/__init__.py
+++ b/wikimetrics/models/__init__.py
@@ -5,6 +5,7 @@
 from concat_metrics_job import *
 from job import *
 from metric_job import *
+from job_response import *
 from multi_project_metric_job import *
 from user import *
 from wikiuser import *
diff --git a/wikimetrics/models/job.py b/wikimetrics/models/job.py
index d75550e..24f60c3 100644
--- a/wikimetrics/models/job.py
+++ b/wikimetrics/models/job.py
@@ -1,6 +1,12 @@
 import collections
+import pickle
 from sqlalchemy import Column, Integer, String, ForeignKey
+from sqlalchemy.orm import relationship, backref
 from celery import group, chord
+from celery.utils.log import get_task_logger
+from celery import current_task
+import traceback
+import logging
 
 from wikimetrics.configurables import db, queue
 
@@ -27,10 +33,53 @@
 """
 
 
+
+task_logger = get_task_logger(__name__)
+sh = logging.StreamHandler()
+task_logger.addHandler(sh)
+
+
 class JobStatus(object):
     CREATED  = 'CREATED'
     STARTED  = 'STARTED'
     FINISHED = 'FINISHED'
+
+
+class Dammy(db.WikimetricsBase):
+    __tablename__ = 'dummy'
+    id = Column(Integer, primary_key=True)
+    def __init__(self, job):
+        self.job = job
+    @queue.task
+    def rando_taks(self):
+        pass
+
+# this method needs to be at module level for pickling purposes
+def from_db(job_id):
+    """
+    All `Job` subclasses should implement this to ensure that they
+    can be resumed from the database
+    
+    Parameters:
+        job_id  : primary key in the job table which can be used to
+                  locate the serialized information with which a new job
+                  can be created
+    
+    Returns:
+        a new instance of the Job() class which can be re-run
+    """
+    print 'calling from_db!'
+    session = db.get_session()
+    job = session.query(Job).get(job_id)
+    # note the cast to str is necessary because by default
+    # sqlalchemy returns unicode, which changes the byte representation
+    state_dict = pickle.loads(str(job.state))
+    job.__dict__.update(state_dict)
+    #return job
+    #return Dammy(job)
+    #return Dammy(None)
+    return job.__dict__
+    #return {}
 
 
 class Job(db.WikimetricsBase):
@@ -46,33 +95,50 @@
     id = Column(Integer, primary_key=True)
     user_id = Column(Integer, ForeignKey('user.id'))
     classpath = Column(String(200))
+    state = Column(String(1000))
     status = Column(String(100), default=JobStatus.CREATED)
     result_id = Column(String(50))
+    #parent_id = Column(Integer, ForeignKey('job.id'))
+    #children = relationship('Job')
+            #backref=backref("parent", remote_side='job.id'))
     
     # FIXME: calling ConcatMetricsJob().run uses this run instead of the 
JobNode one
     #@queue.task
     #def run(self):
         #pass
     
-    @classmethod
-    def from_db(cls, job_id):
+    def get_state_dict(self):
         """
-        All `Job` subclasses should implement this to ensure that they
-        can be resumed from the database
-        
-        Parameters:
-            cls     : from_db is a class_method so it requires a class
-                      istance as it's first arg, so that it can be called
-                      with JobSubclass.form_db(job_id)
-            job_id  : primary key in the job table which can be used to
-                      locate the serialized information with which a new job
-                      can be created
-        
-        Returns:
-            a new instance of the Job() class which can be re-run
+        All `Job` subclasses should implement this to ensure that any non-
+        database-mapped attributes are saved and reloaded during pickle
+        serialization.  The workflow goes as follows:
+            
+            1) upon calling job.delay(), the job.__reduce__() function is 
called
+            2) job.__reduce__() calls job.save()
+            3) job.save() calls job.get_state_dict()
+            4) job.save() stores state_dict to db-mapped attribute job.state
+            5) job.__reduce__() returns job.from_db() and job.id
+                   as the unpickling function and state, respectively
+            6) job.from_db() is called on unpickling end
+            7) job.from_db() gets new job object from db using job_id
+            8) job.from_db() updates job object using db.state with:
+                   job.__dict__.update(str(pickle.loads(job.state)))
+            9) job.from_db() returns new job to be used in celery context
         """
-        pass
-
+        return {}
+    
+    def save(self):
+        
+        state_dict = self.get_state_dict()
+        self.state = pickle.dumps(state_dict)
+        session = db.get_session()
+        session.add(self)
+        session.commit()
+    
+    def __reduce__(self):
+        self.save()
+        return(from_db, (self.id,))
+    
     def __repr__(self):
         return '<Job("{0}")>'.format(self.id)
     
@@ -91,10 +157,16 @@
     def child_tasks(self):
         return group(child.run.s(child) for child in self.children)
     
-    @queue.task
+    @queue.task(serializer='pickle')
     def run(self):
-        children_then_finish = chord(self.child_tasks())(self.finish.s())
-        children_then_finish.get()
+        with open('celery_log_{0}.log'.format(current_task.request.id), 'w') 
as fout:
+            fout.write('trace_back at run()r:\n%s' % traceback.format_stack())
+            try:
+                children_then_finish = 
chord(self.child_tasks())(self.finish.s())
+                children_then_finish.get()
+            except:
+                task_logger.exception('caught exception within worker:')
+                fout.write('caught exception within 
worker:\n{0}'.format(traceback.format_exc()))
     
     @queue.task
     def finish(self):
diff --git a/wikimetrics/models/job_response.py 
b/wikimetrics/models/job_response.py
new file mode 100644
index 0000000..d8c46ba
--- /dev/null
+++ b/wikimetrics/models/job_response.py
@@ -0,0 +1,38 @@
+import pickle
+from wikimetrics.configurables import queue, db
+import job
+from .multi_project_metric_job import MultiProjectMetricJob
+
+__all__ = [
+    'JobResponse',
+]
+
+
+class JobResponse(job.JobNode):
+    """
+    Represents a batch of cohort-metric jobs created by the 
+    user during a single jobs/create/ workflow.  This is also
+    intended to be the unit of work which could be easily re-run.
+    """
+    
+    
+    def __init__(self, cohort_metrics, user_id):
+        """
+        Parameters:
+        
+            cohort_metrics [(Cohort, Metric),...]  : list of cohort-metric 
pairs to be run
+        """
+        super(JobResponse, self).__init__(user_id=user_id)
+        self.children = [MultiProjectMetricJob(c, m) for c, m in 
cohort_metrics]
+    
+    def get_state_dict(self):
+        """
+        place any non-sqlalchemy attributes which need to be saved in this
+        dict, and they will be pickled by sav() and unplicked by from_db(),
+        using self.__dict__.update(pickle.loads(self.state))
+        """
+        return {'test_attr' : None}
+    
+    @queue.task
+    def finish(job_results):
+        return job_results
diff --git a/wikimetrics/models/metric_job.py b/wikimetrics/models/metric_job.py
index 02621be..e145a99 100644
--- a/wikimetrics/models/metric_job.py
+++ b/wikimetrics/models/metric_job.py
@@ -1,5 +1,5 @@
 import job
-from wikimetrics.configurables import queue, db, app
+from wikimetrics.configurables import queue, db
 
 __all__ = [
     'MetricJob',
@@ -14,6 +14,7 @@
     """
     
     def __init__(self, metric, user_ids, project):
+        super(MetricJob, self).__init__()
         self.metric = metric
         self.user_ids = user_ids
         self.project = project
@@ -21,5 +22,4 @@
     @queue.task
     def run(self):
         session = db.get_mw_session(self.project)
-        with app.test_request_context():
-            return self.metric(self.user_ids, session)
+        return self.metric(self.user_ids, session)
diff --git a/wikimetrics/run.py b/wikimetrics/run.py
index ae62c15..5e47ada 100644
--- a/wikimetrics/run.py
+++ b/wikimetrics/run.py
@@ -40,7 +40,7 @@
         dest='override_config',
     )
     parser.add_argument(
-        'mode',
+        '--mode',
         nargs='?',
         default='import',
         choices=[
@@ -83,7 +83,7 @@
 # wikimetrics.configurables
 ##################################
 parser = setup_parser()
-args = parser.parse_args()
+args, others = parser.parse_known_args()
 logger.info('running with arguments:\n%s', pprint.pformat(vars(args)))
 
 # runs the appropriate config function (web, celery, test)

-- 
To view, visit https://gerrit.wikimedia.org/r/72624
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I7fccca2853fe4376ef503a66f256da4768664b85
Gerrit-PatchSet: 1
Gerrit-Project: analytics/wikimetrics
Gerrit-Branch: master
Gerrit-Owner: Erosen <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to