Mforns has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/373078 )

Change subject: [wIP] Close all sessions to avoid database connection errors
......................................................................

[wIP] Close all sessions to avoid database connection errors

Bug: T173585
Change-Id: Iad61bca63ccc6dabf306687f523ff65fff269875
---
M wikimetrics/api/cohorts.py
M wikimetrics/api/replication_lag.py
M wikimetrics/controllers/authentication.py
M wikimetrics/controllers/cohorts.py
M wikimetrics/controllers/demo.py
M wikimetrics/controllers/reports.py
M wikimetrics/forms/cohort_upload.py
M wikimetrics/forms/validators.py
M wikimetrics/models/report_nodes/metric_report.py
M wikimetrics/models/report_nodes/report.py
M wikimetrics/models/report_nodes/run_program_metrics_report.py
M wikimetrics/models/report_nodes/run_report.py
M wikimetrics/models/report_nodes/sum_aggregate_by_user_report.py
M wikimetrics/models/storage/cohort.py
M wikimetrics/models/storage/report.py
M wikimetrics/models/storage/task_error.py
M wikimetrics/models/validate_cohort.py
M wikimetrics/schedules/daily.py
18 files changed, 87 insertions(+), 10 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/wikimetrics 
refs/changes/78/373078/1

diff --git a/wikimetrics/api/cohorts.py b/wikimetrics/api/cohorts.py
index 3b797cb..c6223c7 100644
--- a/wikimetrics/api/cohorts.py
+++ b/wikimetrics/api/cohorts.py
@@ -99,7 +99,9 @@
             cohort  : a logical Cohort object
         """
         db_session = db.get_session()
-        return db_session.query(CohortStore).get(cohort.id)
+        cohort = db_session.query(CohortStore).get(cohort.id)
+        db_session.close()
+        return cohort
 
     def fetch_by_id(self, cohort_id):
         """
@@ -109,7 +111,9 @@
             cohort  : a logical Cohort object
         """
         db_session = db.get_session()
-        return db_session.query(CohortStore).get(cohort_id)
+        cohort = db_session.query(CohortStore).get(cohort_id)
+        db_session.close()
+        return cohort
 
     def get_cohort_by_name(self, db_session, name):
         """
@@ -473,6 +477,7 @@
 
         if cu != 1:
             db_session.rollback()
+            db_session.close()
             raise DatabaseError('No owner or multiple owners in cohort.')
         else:
             try:
@@ -501,6 +506,8 @@
             except DatabaseError:
                 db_session.rollback()
                 raise DatabaseError('Owner attempt to delete a cohort failed.')
+            finally:
+                db_session.close()
 
     def delete_viewer_cohort(self, db_session, user_id, cohort_id):
         """
@@ -518,4 +525,7 @@
 
         if cu != 1:
             db_session.rollback()
+            db_session.close()
             raise DatabaseError('Viewer attempt delete cohort failed.')
+        else:
+            db_session.close()
diff --git a/wikimetrics/api/replication_lag.py 
b/wikimetrics/api/replication_lag.py
index 032ebd8..dff178f 100644
--- a/wikimetrics/api/replication_lag.py
+++ b/wikimetrics/api/replication_lag.py
@@ -43,6 +43,7 @@
             .order_by(Revision.rev_timestamp.desc())\
             .limit(1)\
             .scalar()
+        session.close()
 
         return timestamp is None or \
             timestamp < datetime.now() - self._lag_threshold
diff --git a/wikimetrics/controllers/authentication.py 
b/wikimetrics/controllers/authentication.py
index e6bb188..2fac42d 100644
--- a/wikimetrics/controllers/authentication.py
+++ b/wikimetrics/controllers/authentication.py
@@ -61,7 +61,9 @@
     Callback required by Flask-Login.  Gets the User object from the database.
     """
     db_session = db.get_session()
-    return UserStore.get(db_session, user_id)
+    user = UserStore.get(db_session, user_id)
+    db_session.close()
+    return user
 
 
 @login_manager.unauthorized_handler
@@ -86,6 +88,7 @@
     if type(current_user) is UserStore:
         current_user.logout(db_session)
     logout_user()
+    db_session.close()
     return redirect(url_for('home_index'))
 
 
@@ -146,16 +149,20 @@
                 db_session.commit()
             except:
                 db_session.rollback()
+                db_session.close()
                 raise
 
         except MultipleResultsFound:
             flash('Multiple users found with your id!!! Contact 
Administrator', 'error')
+            db_session.close()
             return redirect(url_for('login'))
 
         user.login(db_session)
         if login_user(user):
             user.detach_from(db_session)
             del session['request_token']
+
+        db_session.close()
 
     except Exception:
         flash('You need to grant the app permissions in order to login.', 
'error')
@@ -218,9 +225,11 @@
                     db_session.commit()
                 except:
                     db_session.rollback()
+                    db_session.close()
                     raise
             
             except MultipleResultsFound:
+                db_session.close()
                 return 'Multiple users found with your id!!! Contact 
Administrator'
             
             user.login(db_session)
@@ -229,6 +238,8 @@
                 redirect_to = session.get('next') or url_for('home_index')
                 redirect_to = urllib2.unquote(redirect_to)
                 return redirect(redirect_to)
+
+            db_session.close()
     
     flash('Was not allowed to authenticate you with Google.', 'error')
     return redirect(url_for('login'))
@@ -249,4 +260,5 @@
             user = 
db_session.query(UserStore).filter_by(email='[email protected]').one()
             user.login(db_session)
             login_user(user)
+            session.close()
             return ''
diff --git a/wikimetrics/controllers/cohorts.py 
b/wikimetrics/controllers/cohorts.py
index 0274629..4112cb8 100644
--- a/wikimetrics/controllers/cohorts.py
+++ b/wikimetrics/controllers/cohorts.py
@@ -47,6 +47,7 @@
     """
     session = db.get_session()
     tags = g.tag_service.get_all_tags(session)
+    session.close()
     return render_template('cohorts.html', tags=json.dumps(tags))
 
 
@@ -59,6 +60,7 @@
     else:
         cohorts = g.cohort_service.get_list(db_session, current_user.id)
 
+    db_session.close()
     return json_response(cohorts=[{
         'id': c.id,
         'name': c.name,
@@ -89,6 +91,8 @@
         # don't need to roll back session because it's just a query
         app.logger.exception(str(e))
         return 'Error fetching membership for this cohort', 500
+    finally:
+        session.close()
 
 
 @app.route('/cohorts/<string:cohort_id>/membership/delete', methods=['POST'])
@@ -108,6 +112,8 @@
     except Exception as e:
         app.logger.exception(str(e))
         return json_error(e.message)
+    finally:
+        session.close()
 
 
 @app.route('/cohorts/detail/<string:name_or_id>')
@@ -142,6 +148,8 @@
         return 'You are not allowed to access this Cohort', 401
     except NoResultFound:
         return 'Could not find this Cohort', 404
+    finally:
+        db_session.close()
 
     return json_response(cohort_dict)
 
@@ -242,6 +250,7 @@
     name = request.args.get('name')
     session = db.get_session()
     available = g.cohort_service.get_cohort_by_name(session, name) is None
+    session.close()
     return json.dumps(available)
 
 
@@ -271,6 +280,8 @@
         return json_error('You are not allowed to access this cohort')
     except NoResultFound:
         return json_error('This cohort does not exist')
+    finally:
+        session.close()
 
 
 def num_users(session, cohort_id):
@@ -327,6 +338,8 @@
     except DatabaseError as e:
         session.rollback()
         return json_error(e.message)
+    finally:
+        session.close()
 
 
 @app.route('/cohorts/<int:cohort_id>/tag/add/', defaults={'tag': None}, 
methods=['POST'])
@@ -370,6 +383,8 @@
     except DatabaseError as e:
         session.rollback()
         return json_error(e.message)
+    finally:
+        session.close()
 
     return json_response(data)
 
@@ -384,6 +399,7 @@
         .filter(CohortTagStore.tag_id == TagStore.id) \
         .all()
     tag_names = [tag[0] for tag in tag_names]
+    session.close()
     return json.dumps(sorted(tag_names))
 
 
@@ -397,4 +413,5 @@
     session.commit()
 
     tags = g.tag_service.get_all_tags(session)
+    session.close()
     return json_response(message='success', 
tagsAutocompleteList=json.dumps(tags))
diff --git a/wikimetrics/controllers/demo.py b/wikimetrics/controllers/demo.py
index c54384c..cb54e5c 100644
--- a/wikimetrics/controllers/demo.py
+++ b/wikimetrics/controllers/demo.py
@@ -15,8 +15,10 @@
     def get_session_and_leave_open():
         session = db.get_session()
         session.query(ReportStore).all()
+        session.close()
         session2 = db.get_session()
         session2.query(ReportStore).all()
+        session.close()
         return ''
 
     @app.route('/demo/create/fake-<string:project>-users/<int:n>')
diff --git a/wikimetrics/controllers/reports.py 
b/wikimetrics/controllers/reports.py
index a762faf..f9b6717 100644
--- a/wikimetrics/controllers/reports.py
+++ b/wikimetrics/controllers/reports.py
@@ -60,6 +60,7 @@
         .one()[0]
 
     data = report_result_json(result_key).data
+    db_session.close()
 
     # call would throw an exception if report cannot be made public
     ReportStore.make_report_public(
@@ -118,6 +119,7 @@
             TaskErrorStore.task_type == 'report',
             TaskErrorStore.task_type == None))\
         .all()
+    db_session.close()
     # TODO: update status for all reports at all times (not just show_in_ui 
ones)
     # update status for each report and build response
     reports = []
@@ -187,8 +189,8 @@
         pj = db_session.query(ReportStore)\
             .filter(ReportStore.result_key == result_key)\
             .one()
-
         celery_task = Report.task.AsyncResult(pj.queue_result_key)
+        db_session.close()
         return (celery_task, pj)
     except NoResultFound:
         # don't need to roll back session because it's just a query
@@ -252,6 +254,7 @@
             break
 
         user_names = g.cohort_service.get_wikiusernames_for_cohort(cohort_id, 
session)
+        session.close()
 
     return user_names
 
@@ -470,6 +473,7 @@
     session = db.get_session()
     report = session.query(ReportStore).get(report_id)
     RunReport.rerun(report)
+    session.close()
     return json_response(message='Report scheduled for rerun')
 
 
@@ -481,6 +485,7 @@
 #     if not db_report:
 #        return json_error('no task exists with id: {0}'.format(result_key))
 #     celery_task = Report.task.AsyncResult(db_report.result_key)
+#     db_session.close()
 #     app.logger.debug('revoking task: %s', celery_task.id)
 #     from celery.task.control import revoke
 #     celery_task.revoke()
diff --git a/wikimetrics/forms/cohort_upload.py 
b/wikimetrics/forms/cohort_upload.py
index 6543da5..41f79f9 100644
--- a/wikimetrics/forms/cohort_upload.py
+++ b/wikimetrics/forms/cohort_upload.py
@@ -67,6 +67,7 @@
         if self.centralauth.data is True:
             ca_session = db.get_ca_session()
             records = g.centralauth_service.expand_via_centralauth(records, 
ca_session)
+            ca_session.close()
 
         self.records = records
 
diff --git a/wikimetrics/forms/validators.py b/wikimetrics/forms/validators.py
index dac6fdb..5f39cde 100644
--- a/wikimetrics/forms/validators.py
+++ b/wikimetrics/forms/validators.py
@@ -19,9 +19,9 @@
 
     def __call__(self, form, field):
         session = db.get_session()
-
         if CohortService().get_cohort_by_name(session, field.data) is not None:
             raise ValidationError('This cohort name is taken.')
+        session.close()
 
 
 class CohortNameLegalCharacters(object):
diff --git a/wikimetrics/models/report_nodes/metric_report.py 
b/wikimetrics/models/report_nodes/metric_report.py
index 01bfb8b..db85e87 100644
--- a/wikimetrics/models/report_nodes/metric_report.py
+++ b/wikimetrics/models/report_nodes/metric_report.py
@@ -35,6 +35,7 @@
             str(WikiUserKey(key, self.project, self.cohort_id)) : value
             for key, value in results_by_user.items()
         }
+        session.close()
         if not len(results):
             results = {NO_RESULTS : self.metric.default_result}
         return results
diff --git a/wikimetrics/models/report_nodes/report.py 
b/wikimetrics/models/report_nodes/report.py
index 10c709e..5bd5b8b 100644
--- a/wikimetrics/models/report_nodes/report.py
+++ b/wikimetrics/models/report_nodes/report.py
@@ -122,6 +122,8 @@
             except:
                 session.rollback()
                 raise
+            finally:
+                session.close()
     
     def __repr__(self):
         if self.persistent_id is not None:
@@ -145,6 +147,7 @@
             if task_id:
                 pj.queue_result_key = task_id
             session.commit()
+            session.close()
     
     def run(self):
         """
@@ -233,6 +236,7 @@
             pj.result_key = self.result_key
             db_session.add(pj)
             db_session.commit()
+            db_session.close()
         
         merged = {self.result_key: results}
         for child_result in child_results:
diff --git a/wikimetrics/models/report_nodes/run_program_metrics_report.py 
b/wikimetrics/models/report_nodes/run_program_metrics_report.py
index 177d555..3cc854b 100644
--- a/wikimetrics/models/report_nodes/run_program_metrics_report.py
+++ b/wikimetrics/models/report_nodes/run_program_metrics_report.py
@@ -55,8 +55,9 @@
         # First make sure this is a valid cohort
         if cohort_store_object is not None and cohort_store_object.validated:
             self.cohort = cohort_service.convert(cohort_store_object)
+            db_session = db.get_session()
             validate_report = ValidateProgramMetricsReport(self.cohort,
-                                                           db.get_session(),
+                                                           db_session,
                                                            
user_id=self.user_id)
             self.cohort.size = validate_report.unique_users
             self.parameters = {
@@ -93,6 +94,7 @@
                                  self.get_bytes_added_report()]
             else:
                 self.children = [validate_report]
+            db_session.close()
             return super(RunProgramMetricsReport, self).run()
 
         else:
@@ -154,7 +156,10 @@
         if self.public is False:
             return
         rs = ReportService()
-        rs.write_report_to_file(self, results, db.get_session())
+
+        db_session = db.get_session()
+        rs.write_report_to_file(self, results, db_session)
+        db_session.close()
 
     def get_aggregate_by_user_report(self, parameters):
         metric_dict = parameters['metric']
diff --git a/wikimetrics/models/report_nodes/run_report.py 
b/wikimetrics/models/report_nodes/run_report.py
index 720bd36..7a53df6 100644
--- a/wikimetrics/models/report_nodes/run_report.py
+++ b/wikimetrics/models/report_nodes/run_report.py
@@ -74,6 +74,7 @@
         cohort_dict = parameters['cohort']
         session = db.get_session()
         cohort = cohort_service.get(session, user_id, by_id=cohort_dict['id'])
+        session.close()
 
         parameters['cohort']['size'] = cohort.size
 
@@ -146,7 +147,9 @@
             return
 
         rs = ReportService()
-        rs.write_report_to_file(self, results, db.get_session())
+        db_session = db.get_session()
+        rs.write_report_to_file(self, results, db_session)
+        db_session.close()
 
     # TODO, this method belongs on a different class and it should not be a 
class method
     @classmethod
diff --git a/wikimetrics/models/report_nodes/sum_aggregate_by_user_report.py 
b/wikimetrics/models/report_nodes/sum_aggregate_by_user_report.py
index b41813c..f13279c 100644
--- a/wikimetrics/models/report_nodes/sum_aggregate_by_user_report.py
+++ b/wikimetrics/models/report_nodes/sum_aggregate_by_user_report.py
@@ -32,6 +32,7 @@
         service = CohortService()
         session = db.get_session()
         self.usernames = service.get_wikiusernames_for_cohort(cohort.id, 
session)
+        session.close()
 
         self.children = [
             MultiProjectMetricReport(cohort, metric, *args, **kwargs)
diff --git a/wikimetrics/models/storage/cohort.py 
b/wikimetrics/models/storage/cohort.py
index e396b29..9a4767c 100644
--- a/wikimetrics/models/storage/cohort.py
+++ b/wikimetrics/models/storage/cohort.py
@@ -54,6 +54,7 @@
         wikiusers = self.filter_wikiuser_query(
             db_session.query(WikiUserStore.mediawiki_userid)
         ).all()
+        db_session.close()
         return (r.mediawiki_userid for r in wikiusers)
 
     def __len__(self):
@@ -66,11 +67,13 @@
             the number of users in this cohort
         """
         db_session = db.get_session()
-        return db_session.query(func.count(CohortWikiUserStore.id)) \
+        length = db_session.query(func.count(CohortWikiUserStore.id)) \
             .join(WikiUserStore) \
             .filter(CohortWikiUserStore.cohort_id == self.id) \
             .filter(WikiUserStore.valid) \
             .one()[0]
+        db_session.close()
+        return length
 
     def group_by_project(self):
         """
@@ -90,6 +93,7 @@
         user_id_projects = self.filter_wikiuser_query(
             db_session.query(WikiUserStore.mediawiki_userid, 
WikiUserStore.project)
         ).order_by(WikiUserStore.project).all()
+        db_session.close()
 
         if not len(user_id_projects):
             return [(self.default_project, None)]
diff --git a/wikimetrics/models/storage/report.py 
b/wikimetrics/models/storage/report.py
index 4558be5..7882a4d 100644
--- a/wikimetrics/models/storage/report.py
+++ b/wikimetrics/models/storage/report.py
@@ -53,7 +53,10 @@
             if not existing_session:
                 existing_session = db.get_session()
                 existing_session.add(self)
-            existing_session.commit()
+                existing_session.commit()
+                existing_session.close()
+            else:
+                existing_session.commit()
 
     @staticmethod
     def update_reports(report_ids, owner_id, public=None, recurrent=None):
diff --git a/wikimetrics/models/storage/task_error.py 
b/wikimetrics/models/storage/task_error.py
index 0c9a18c..54d2aa8 100644
--- a/wikimetrics/models/storage/task_error.py
+++ b/wikimetrics/models/storage/task_error.py
@@ -35,6 +35,7 @@
             TaskErrorStore.update(db_session, existing, message, traceback)
         else:
             TaskErrorStore.create(db_session, task_type, task_id, message, 
traceback)
+        db_session.close()
 
     @staticmethod
     def get(db_session, task_type, task_id):
diff --git a/wikimetrics/models/validate_cohort.py 
b/wikimetrics/models/validate_cohort.py
index 546e0f7..a45fb1b 100644
--- a/wikimetrics/models/validate_cohort.py
+++ b/wikimetrics/models/validate_cohort.py
@@ -107,12 +107,15 @@
             session.rollback()
             app.logger.error(str(e))
             return None
+        finally:
+            session.close()
 
     def run(self):
         session = db.get_session()
         cohort = session.query(CohortStore).get(self.cohort_id)
         cohort.validation_queue_key = current_task.request.id
         session.commit()
+        session.close()
         self.validate_records(session, cohort)
 
     def validate_records(self, session, cohort):
diff --git a/wikimetrics/schedules/daily.py b/wikimetrics/schedules/daily.py
index 06c8306..fd63d94 100644
--- a/wikimetrics/schedules/daily.py
+++ b/wikimetrics/schedules/daily.py
@@ -67,6 +67,8 @@
         task_logger.error('Problem running recurring reports: {}'.format(
             traceback.format_exc()
         ))
+    finally:
+        session.close()
 
 
 if queue.conf.get('DEBUG'):
@@ -78,3 +80,5 @@
         session2 = db.get_session()
         session2.query(ReportStore).first()
         session.query(ReportStore).first()
+        session.close()
+        session2.close()

-- 
To view, visit https://gerrit.wikimedia.org/r/373078
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Iad61bca63ccc6dabf306687f523ff65fff269875
Gerrit-PatchSet: 1
Gerrit-Project: analytics/wikimetrics
Gerrit-Branch: master
Gerrit-Owner: Mforns <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to