Revision: 366
Author: bslatkin
Date: Sat Jun 5 18:18:50 2010
Log: hub using mapper framework for FeedEntryRecord migration,
EventToDeliver cleanup
http://code.google.com/p/pubsubhubbub/source/detail?r=366
Added:
/trunk/hub/mapreduce.yaml
/trunk/hub/offline_jobs.py
Modified:
/trunk/hub
/trunk/hub/app.yaml
/trunk/hub/queue.yaml
=======================================
--- /dev/null
+++ /trunk/hub/mapreduce.yaml Sat Jun 5 18:18:50 2010
@@ -0,0 +1,26 @@
+mapreduce:
+- name: Remove old properties from FeedEntryRecords
+ mapper:
+ input_reader: mapreduce.input_readers.DatastoreInputReader
+ handler: offline_jobs.RemoveOldFeedEntryRecordPropertiesMapper
+ params:
+ - name: entity_kind
+ default: main.FeedEntryRecord
+ - name: shard_count
+ default: 32
+ - name: processing_rate
+ default: 100000
+- name: Cleanup old EventToDeliver instances
+ mapper:
+ input_reader: mapreduce.input_readers.DatastoreInputReader
+ handler: offline_jobs.CleanupOldEventToDeliver.run
+ params:
+ - name: entity_kind
+ default: main.EventToDeliver
+ - name: shard_count
+ default: 32
+ - name: processing_rate
+ default: 100000
+ - name: age_days
+ default: 14
+ params_validator: offline_jobs.CleanupOldEventToDeliver.validate_params
=======================================
--- /dev/null
+++ /trunk/hub/offline_jobs.py Sat Jun 5 18:18:50 2010
@@ -0,0 +1,60 @@
+#!/usr/bin/env python
+#
+# Copyright 2010 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Offline analysis jobs used with the hub."""
+
+import datetime
+import logging
+import time
+
+from google.appengine.ext import db
+
+from mapreduce import context
+from mapreduce import operation as op
+
+
+def RemoveOldFeedEntryRecordPropertiesMapper(feed_entry_record):
+ """Removes old properties from FeedEntryRecord instances."""
+ OLD_PROPERTIES = (
+ 'entry_id_hash',
+ 'entry_id')
+ for name in OLD_PROPERTIES:
+ if hasattr(feed_entry_record, name):
+ delattr(feed_entry_record, name)
+ yield op.db.Put(feed_entry_record)
+
+
+class CleanupOldEventToDeliver(object):
+ """Removes EventToDeliver instances older than a certain value."""
+
+ @staticmethod
+ def validate_params(params):
+ assert 'age_days' in params
+ params['oldest_last_modified'] = (
+ time.time() - (86400 * int(params['age_days'])))
+
+ def __init__(self):
+ self.oldest_last_modified = None
+
+ def run(self, event):
+ if not self.oldest_last_modified:
+ params = context.get().mapreduce_spec.mapper.params
+ self.oldest_last_modified = datetime.datetime.utcfromtimestamp(
+ params['oldest_last_modified'])
+
+ if event.last_modified < self.oldest_last_modified:
+ yield op.db.Delete(event)
=======================================
--- /trunk/hub/app.yaml Fri Feb 26 11:54:31 2010
+++ /trunk/hub/app.yaml Sat Jun 5 18:18:50 2010
@@ -48,6 +48,11 @@
script: main.py
login: admin
+# Mapreduce for running offline jobs.
+- url: /mapreduce(/.*)?
+ script: mapreduce/main.py
+ login: admin
+
- url: .*
script: main.py
secure: optional
=======================================
--- /trunk/hub/queue.yaml Wed Sep 23 15:14:30 2009
+++ /trunk/hub/queue.yaml Sat Jun 5 18:18:50 2010
@@ -1,17 +1,19 @@
queue:
- - name: subscriptions
- rate: 0.5/s
- - name: polling
- rate: 0.5/s
- - name: feed-pulls
- rate: 3/s
- - name: feed-pulls-retries
- rate: 1/s
- - name: event-delivery
- rate: 3/s
- - name: event-delivery-retries
- rate: 1/s
- - name: mappings
- rate: 1/s
- - name: default
- rate: 0/s
+- name: subscriptions
+ rate: 1/s
+- name: polling
+ rate: 1/s
+- name: feed-pulls
+ rate: 5/s
+- name: feed-pulls-retries
+ rate: 1/s
+- name: event-delivery
+ rate: 5/s
+- name: event-delivery-retries
+ rate: 1/s
+- name: mappings
+ rate: 1/s
+- name: mapreduce
+ rate: 2/s
+- name: default
+ rate: 0/s