Mforns has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/373078 )
Change subject: [wIP] Close all sessions to avoid database connection errors
......................................................................
[wIP] Close all sessions to avoid database connection errors
Bug: T173585
Change-Id: Iad61bca63ccc6dabf306687f523ff65fff269875
---
M wikimetrics/api/cohorts.py
M wikimetrics/api/replication_lag.py
M wikimetrics/controllers/authentication.py
M wikimetrics/controllers/cohorts.py
M wikimetrics/controllers/demo.py
M wikimetrics/controllers/reports.py
M wikimetrics/forms/cohort_upload.py
M wikimetrics/forms/validators.py
M wikimetrics/models/report_nodes/metric_report.py
M wikimetrics/models/report_nodes/report.py
M wikimetrics/models/report_nodes/run_program_metrics_report.py
M wikimetrics/models/report_nodes/run_report.py
M wikimetrics/models/report_nodes/sum_aggregate_by_user_report.py
M wikimetrics/models/storage/cohort.py
M wikimetrics/models/storage/report.py
M wikimetrics/models/storage/task_error.py
M wikimetrics/models/validate_cohort.py
M wikimetrics/schedules/daily.py
18 files changed, 87 insertions(+), 10 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/wikimetrics
refs/changes/78/373078/1
diff --git a/wikimetrics/api/cohorts.py b/wikimetrics/api/cohorts.py
index 3b797cb..c6223c7 100644
--- a/wikimetrics/api/cohorts.py
+++ b/wikimetrics/api/cohorts.py
@@ -99,7 +99,9 @@
cohort : a logical Cohort object
"""
db_session = db.get_session()
- return db_session.query(CohortStore).get(cohort.id)
+ cohort = db_session.query(CohortStore).get(cohort.id)
+ db_session.close()
+ return cohort
def fetch_by_id(self, cohort_id):
"""
@@ -109,7 +111,9 @@
cohort : a logical Cohort object
"""
db_session = db.get_session()
- return db_session.query(CohortStore).get(cohort_id)
+ cohort = db_session.query(CohortStore).get(cohort_id)
+ db_session.close()
+ return cohort
def get_cohort_by_name(self, db_session, name):
"""
@@ -473,6 +477,7 @@
if cu != 1:
db_session.rollback()
+ db_session.close()
raise DatabaseError('No owner or multiple owners in cohort.')
else:
try:
@@ -501,6 +506,8 @@
except DatabaseError:
db_session.rollback()
raise DatabaseError('Owner attempt to delete a cohort failed.')
+ finally:
+ db_session.close()
def delete_viewer_cohort(self, db_session, user_id, cohort_id):
"""
@@ -518,4 +525,7 @@
if cu != 1:
db_session.rollback()
+ db_session.close()
raise DatabaseError('Viewer attempt delete cohort failed.')
+ else:
+ db_session.close()
diff --git a/wikimetrics/api/replication_lag.py
b/wikimetrics/api/replication_lag.py
index 032ebd8..dff178f 100644
--- a/wikimetrics/api/replication_lag.py
+++ b/wikimetrics/api/replication_lag.py
@@ -43,6 +43,7 @@
.order_by(Revision.rev_timestamp.desc())\
.limit(1)\
.scalar()
+ session.close()
return timestamp is None or \
timestamp < datetime.now() - self._lag_threshold
diff --git a/wikimetrics/controllers/authentication.py
b/wikimetrics/controllers/authentication.py
index e6bb188..2fac42d 100644
--- a/wikimetrics/controllers/authentication.py
+++ b/wikimetrics/controllers/authentication.py
@@ -61,7 +61,9 @@
Callback required by Flask-Login. Gets the User object from the database.
"""
db_session = db.get_session()
- return UserStore.get(db_session, user_id)
+ user = UserStore.get(db_session, user_id)
+ db_session.close()
+ return user
@login_manager.unauthorized_handler
@@ -86,6 +88,7 @@
if type(current_user) is UserStore:
current_user.logout(db_session)
logout_user()
+ db_session.close()
return redirect(url_for('home_index'))
@@ -146,16 +149,20 @@
db_session.commit()
except:
db_session.rollback()
+ db_session.close()
raise
except MultipleResultsFound:
flash('Multiple users found with your id!!! Contact
Administrator', 'error')
+ db_session.close()
return redirect(url_for('login'))
user.login(db_session)
if login_user(user):
user.detach_from(db_session)
del session['request_token']
+
+ db_session.close()
except Exception:
flash('You need to grant the app permissions in order to login.',
'error')
@@ -218,9 +225,11 @@
db_session.commit()
except:
db_session.rollback()
+ db_session.close()
raise
except MultipleResultsFound:
+ db_session.close()
return 'Multiple users found with your id!!! Contact
Administrator'
user.login(db_session)
@@ -229,6 +238,8 @@
redirect_to = session.get('next') or url_for('home_index')
redirect_to = urllib2.unquote(redirect_to)
return redirect(redirect_to)
+
+ db_session.close()
flash('Was not allowed to authenticate you with Google.', 'error')
return redirect(url_for('login'))
@@ -249,4 +260,5 @@
user =
db_session.query(UserStore).filter_by(email='[email protected]').one()
user.login(db_session)
login_user(user)
+ session.close()
return ''
diff --git a/wikimetrics/controllers/cohorts.py
b/wikimetrics/controllers/cohorts.py
index 0274629..4112cb8 100644
--- a/wikimetrics/controllers/cohorts.py
+++ b/wikimetrics/controllers/cohorts.py
@@ -47,6 +47,7 @@
"""
session = db.get_session()
tags = g.tag_service.get_all_tags(session)
+ session.close()
return render_template('cohorts.html', tags=json.dumps(tags))
@@ -59,6 +60,7 @@
else:
cohorts = g.cohort_service.get_list(db_session, current_user.id)
+ db_session.close()
return json_response(cohorts=[{
'id': c.id,
'name': c.name,
@@ -89,6 +91,8 @@
# don't need to roll back session because it's just a query
app.logger.exception(str(e))
return 'Error fetching membership for this cohort', 500
+ finally:
+ session.close()
@app.route('/cohorts/<string:cohort_id>/membership/delete', methods=['POST'])
@@ -108,6 +112,8 @@
except Exception as e:
app.logger.exception(str(e))
return json_error(e.message)
+ finally:
+ session.close()
@app.route('/cohorts/detail/<string:name_or_id>')
@@ -142,6 +148,8 @@
return 'You are not allowed to access this Cohort', 401
except NoResultFound:
return 'Could not find this Cohort', 404
+ finally:
+ db_session.close()
return json_response(cohort_dict)
@@ -242,6 +250,7 @@
name = request.args.get('name')
session = db.get_session()
available = g.cohort_service.get_cohort_by_name(session, name) is None
+ session.close()
return json.dumps(available)
@@ -271,6 +280,8 @@
return json_error('You are not allowed to access this cohort')
except NoResultFound:
return json_error('This cohort does not exist')
+ finally:
+ session.close()
def num_users(session, cohort_id):
@@ -327,6 +338,8 @@
except DatabaseError as e:
session.rollback()
return json_error(e.message)
+ finally:
+ session.close()
@app.route('/cohorts/<int:cohort_id>/tag/add/', defaults={'tag': None},
methods=['POST'])
@@ -370,6 +383,8 @@
except DatabaseError as e:
session.rollback()
return json_error(e.message)
+ finally:
+ session.close()
return json_response(data)
@@ -384,6 +399,7 @@
.filter(CohortTagStore.tag_id == TagStore.id) \
.all()
tag_names = [tag[0] for tag in tag_names]
+ session.close()
return json.dumps(sorted(tag_names))
@@ -397,4 +413,5 @@
session.commit()
tags = g.tag_service.get_all_tags(session)
+ session.close()
return json_response(message='success',
tagsAutocompleteList=json.dumps(tags))
diff --git a/wikimetrics/controllers/demo.py b/wikimetrics/controllers/demo.py
index c54384c..cb54e5c 100644
--- a/wikimetrics/controllers/demo.py
+++ b/wikimetrics/controllers/demo.py
@@ -15,8 +15,10 @@
def get_session_and_leave_open():
session = db.get_session()
session.query(ReportStore).all()
+ session.close()
session2 = db.get_session()
session2.query(ReportStore).all()
+ session.close()
return ''
@app.route('/demo/create/fake-<string:project>-users/<int:n>')
diff --git a/wikimetrics/controllers/reports.py
b/wikimetrics/controllers/reports.py
index a762faf..f9b6717 100644
--- a/wikimetrics/controllers/reports.py
+++ b/wikimetrics/controllers/reports.py
@@ -60,6 +60,7 @@
.one()[0]
data = report_result_json(result_key).data
+ db_session.close()
# call would throw an exception if report cannot be made public
ReportStore.make_report_public(
@@ -118,6 +119,7 @@
TaskErrorStore.task_type == 'report',
TaskErrorStore.task_type == None))\
.all()
+ db_session.close()
# TODO: update status for all reports at all times (not just show_in_ui
ones)
# update status for each report and build response
reports = []
@@ -187,8 +189,8 @@
pj = db_session.query(ReportStore)\
.filter(ReportStore.result_key == result_key)\
.one()
-
celery_task = Report.task.AsyncResult(pj.queue_result_key)
+ db_session.close()
return (celery_task, pj)
except NoResultFound:
# don't need to roll back session because it's just a query
@@ -252,6 +254,7 @@
break
user_names = g.cohort_service.get_wikiusernames_for_cohort(cohort_id,
session)
+ session.close()
return user_names
@@ -470,6 +473,7 @@
session = db.get_session()
report = session.query(ReportStore).get(report_id)
RunReport.rerun(report)
+ session.close()
return json_response(message='Report scheduled for rerun')
@@ -481,6 +485,7 @@
# if not db_report:
# return json_error('no task exists with id: {0}'.format(result_key))
# celery_task = Report.task.AsyncResult(db_report.result_key)
+# db_session.close()
# app.logger.debug('revoking task: %s', celery_task.id)
# from celery.task.control import revoke
# celery_task.revoke()
diff --git a/wikimetrics/forms/cohort_upload.py
b/wikimetrics/forms/cohort_upload.py
index 6543da5..41f79f9 100644
--- a/wikimetrics/forms/cohort_upload.py
+++ b/wikimetrics/forms/cohort_upload.py
@@ -67,6 +67,7 @@
if self.centralauth.data is True:
ca_session = db.get_ca_session()
records = g.centralauth_service.expand_via_centralauth(records,
ca_session)
+ ca_session.close()
self.records = records
diff --git a/wikimetrics/forms/validators.py b/wikimetrics/forms/validators.py
index dac6fdb..5f39cde 100644
--- a/wikimetrics/forms/validators.py
+++ b/wikimetrics/forms/validators.py
@@ -19,9 +19,9 @@
def __call__(self, form, field):
session = db.get_session()
-
if CohortService().get_cohort_by_name(session, field.data) is not None:
raise ValidationError('This cohort name is taken.')
+ session.close()
class CohortNameLegalCharacters(object):
diff --git a/wikimetrics/models/report_nodes/metric_report.py
b/wikimetrics/models/report_nodes/metric_report.py
index 01bfb8b..db85e87 100644
--- a/wikimetrics/models/report_nodes/metric_report.py
+++ b/wikimetrics/models/report_nodes/metric_report.py
@@ -35,6 +35,7 @@
str(WikiUserKey(key, self.project, self.cohort_id)) : value
for key, value in results_by_user.items()
}
+ session.close()
if not len(results):
results = {NO_RESULTS : self.metric.default_result}
return results
diff --git a/wikimetrics/models/report_nodes/report.py
b/wikimetrics/models/report_nodes/report.py
index 10c709e..5bd5b8b 100644
--- a/wikimetrics/models/report_nodes/report.py
+++ b/wikimetrics/models/report_nodes/report.py
@@ -122,6 +122,8 @@
except:
session.rollback()
raise
+ finally:
+ session.close()
def __repr__(self):
if self.persistent_id is not None:
@@ -145,6 +147,7 @@
if task_id:
pj.queue_result_key = task_id
session.commit()
+ session.close()
def run(self):
"""
@@ -233,6 +236,7 @@
pj.result_key = self.result_key
db_session.add(pj)
db_session.commit()
+ db_session.close()
merged = {self.result_key: results}
for child_result in child_results:
diff --git a/wikimetrics/models/report_nodes/run_program_metrics_report.py
b/wikimetrics/models/report_nodes/run_program_metrics_report.py
index 177d555..3cc854b 100644
--- a/wikimetrics/models/report_nodes/run_program_metrics_report.py
+++ b/wikimetrics/models/report_nodes/run_program_metrics_report.py
@@ -55,8 +55,9 @@
# First make sure this is a valid cohort
if cohort_store_object is not None and cohort_store_object.validated:
self.cohort = cohort_service.convert(cohort_store_object)
+ db_session = db.get_session()
validate_report = ValidateProgramMetricsReport(self.cohort,
- db.get_session(),
+ db_session,
user_id=self.user_id)
self.cohort.size = validate_report.unique_users
self.parameters = {
@@ -93,6 +94,7 @@
self.get_bytes_added_report()]
else:
self.children = [validate_report]
+ db_session.close()
return super(RunProgramMetricsReport, self).run()
else:
@@ -154,7 +156,10 @@
if self.public is False:
return
rs = ReportService()
- rs.write_report_to_file(self, results, db.get_session())
+
+ db_session = db.get_session()
+ rs.write_report_to_file(self, results, db_session)
+ db_session.close()
def get_aggregate_by_user_report(self, parameters):
metric_dict = parameters['metric']
diff --git a/wikimetrics/models/report_nodes/run_report.py
b/wikimetrics/models/report_nodes/run_report.py
index 720bd36..7a53df6 100644
--- a/wikimetrics/models/report_nodes/run_report.py
+++ b/wikimetrics/models/report_nodes/run_report.py
@@ -74,6 +74,7 @@
cohort_dict = parameters['cohort']
session = db.get_session()
cohort = cohort_service.get(session, user_id, by_id=cohort_dict['id'])
+ session.close()
parameters['cohort']['size'] = cohort.size
@@ -146,7 +147,9 @@
return
rs = ReportService()
- rs.write_report_to_file(self, results, db.get_session())
+ db_session = db.get_session()
+ rs.write_report_to_file(self, results, db_session)
+ db_session.close()
# TODO, this method belongs on a different class and it should not be a
class method
@classmethod
diff --git a/wikimetrics/models/report_nodes/sum_aggregate_by_user_report.py
b/wikimetrics/models/report_nodes/sum_aggregate_by_user_report.py
index b41813c..f13279c 100644
--- a/wikimetrics/models/report_nodes/sum_aggregate_by_user_report.py
+++ b/wikimetrics/models/report_nodes/sum_aggregate_by_user_report.py
@@ -32,6 +32,7 @@
service = CohortService()
session = db.get_session()
self.usernames = service.get_wikiusernames_for_cohort(cohort.id,
session)
+ session.close()
self.children = [
MultiProjectMetricReport(cohort, metric, *args, **kwargs)
diff --git a/wikimetrics/models/storage/cohort.py
b/wikimetrics/models/storage/cohort.py
index e396b29..9a4767c 100644
--- a/wikimetrics/models/storage/cohort.py
+++ b/wikimetrics/models/storage/cohort.py
@@ -54,6 +54,7 @@
wikiusers = self.filter_wikiuser_query(
db_session.query(WikiUserStore.mediawiki_userid)
).all()
+ db_session.close()
return (r.mediawiki_userid for r in wikiusers)
def __len__(self):
@@ -66,11 +67,13 @@
the number of users in this cohort
"""
db_session = db.get_session()
- return db_session.query(func.count(CohortWikiUserStore.id)) \
+ length = db_session.query(func.count(CohortWikiUserStore.id)) \
.join(WikiUserStore) \
.filter(CohortWikiUserStore.cohort_id == self.id) \
.filter(WikiUserStore.valid) \
.one()[0]
+ db_session.close()
+ return length
def group_by_project(self):
"""
@@ -90,6 +93,7 @@
user_id_projects = self.filter_wikiuser_query(
db_session.query(WikiUserStore.mediawiki_userid,
WikiUserStore.project)
).order_by(WikiUserStore.project).all()
+ db_session.close()
if not len(user_id_projects):
return [(self.default_project, None)]
diff --git a/wikimetrics/models/storage/report.py
b/wikimetrics/models/storage/report.py
index 4558be5..7882a4d 100644
--- a/wikimetrics/models/storage/report.py
+++ b/wikimetrics/models/storage/report.py
@@ -53,7 +53,10 @@
if not existing_session:
existing_session = db.get_session()
existing_session.add(self)
- existing_session.commit()
+ existing_session.commit()
+ existing_session.close()
+ else:
+ existing_session.commit()
@staticmethod
def update_reports(report_ids, owner_id, public=None, recurrent=None):
diff --git a/wikimetrics/models/storage/task_error.py
b/wikimetrics/models/storage/task_error.py
index 0c9a18c..54d2aa8 100644
--- a/wikimetrics/models/storage/task_error.py
+++ b/wikimetrics/models/storage/task_error.py
@@ -35,6 +35,7 @@
TaskErrorStore.update(db_session, existing, message, traceback)
else:
TaskErrorStore.create(db_session, task_type, task_id, message,
traceback)
+ db_session.close()
@staticmethod
def get(db_session, task_type, task_id):
diff --git a/wikimetrics/models/validate_cohort.py
b/wikimetrics/models/validate_cohort.py
index 546e0f7..a45fb1b 100644
--- a/wikimetrics/models/validate_cohort.py
+++ b/wikimetrics/models/validate_cohort.py
@@ -107,12 +107,15 @@
session.rollback()
app.logger.error(str(e))
return None
+ finally:
+ session.close()
def run(self):
session = db.get_session()
cohort = session.query(CohortStore).get(self.cohort_id)
cohort.validation_queue_key = current_task.request.id
session.commit()
+ session.close()
self.validate_records(session, cohort)
def validate_records(self, session, cohort):
diff --git a/wikimetrics/schedules/daily.py b/wikimetrics/schedules/daily.py
index 06c8306..fd63d94 100644
--- a/wikimetrics/schedules/daily.py
+++ b/wikimetrics/schedules/daily.py
@@ -67,6 +67,8 @@
task_logger.error('Problem running recurring reports: {}'.format(
traceback.format_exc()
))
+ finally:
+ session.close()
if queue.conf.get('DEBUG'):
@@ -78,3 +80,5 @@
session2 = db.get_session()
session2.query(ReportStore).first()
session.query(ReportStore).first()
+ session.close()
+ session2.close()
--
To view, visit https://gerrit.wikimedia.org/r/373078
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Iad61bca63ccc6dabf306687f523ff65fff269875
Gerrit-PatchSet: 1
Gerrit-Project: analytics/wikimetrics
Gerrit-Branch: master
Gerrit-Owner: Mforns <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits