http://www.mediawiki.org/wiki/Special:Code/MediaWiki/82762
Revision: 82762
Author: diederik
Date: 2011-02-24 23:36:22 +0000 (Thu, 24 Feb 2011)
Log Message:
-----------
Generic XML parser that creates variables and can output to different databases
(Cassandra and Mongo).
Modified Paths:
--------------
trunk/tools/editor_trends/etl/enricher.py
trunk/tools/editor_trends/etl/extracter.py
Modified: trunk/tools/editor_trends/etl/enricher.py
===================================================================
--- trunk/tools/editor_trends/etl/enricher.py 2011-02-24 23:13:06 UTC (rev
82761)
+++ trunk/tools/editor_trends/etl/enricher.py 2011-02-24 23:36:22 UTC (rev
82762)
@@ -23,12 +23,64 @@
import hashlib
import codecs
import re
-from multiprocessing import JoinableQueue, Process
-#from xml.etree.cElementTree. import iterparse
-from xml.etree.cElementTree import fromstring
+import sys
+import progressbar
+from multiprocessing import JoinableQueue, Process, cpu_count
+from xml.etree.cElementTree import fromstring, dump
+from collections import deque
-RE_CATEGORY= re.compile('\(.*\`\,\.\-\:\'\)')
+try:
+ import pycassa
+except ImportError:
+ print 'I am not going to use Cassandra today, it\'s my off day.'
+if '..' not in sys.path:
+ sys.path.append('..')
+
+from database import db
+from bots import detector
+from utils import file_utils
+import extracter
+
+try:
+ import psyco
+ psyco.full()
+except ImportError:
+ print 'and psyco is having an off day as well...'
+
+RE_CATEGORY = re.compile('\(.*\`\,\.\-\:\'\)')
+
+
+class Buffer:
+ def __init__(self, storage):
+ assert storage == 'cassandra' or storage == 'mongo', \
+ 'Valid storage options are cassandra and mongo.'
+ self.storage = storage
+ self.revisions = []
+ self.setup_db()
+
+ def setup_db(self):
+ if self.storage == 'cassandra':
+ pass
+ else:
+ self.db = db.init_mongo_db('enwiki')
+ self.collection = self.db['kaggle']
+
+ def add(self, revision):
+ self.revisions.append(revision)
+ if len(self.revisions) == 100:
+ self.store()
+
+ def empty(self):
+ self.store()
+
+ def store(self):
+ if self.storage == 'cassandra':
+ print 'insert into cassandra'
+ else:
+ print 'insert into mongo'
+
+
def extract_categories():
'''
Field 1: page id
@@ -36,57 +88,117 @@
Field 3: sort key
Field 4: timestamp last change
'''
- filename = '/Users/diederik/Downloads/enwiki-20110115-categorylinks.sql'
+ filename =
'C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-categorylinks.sql'
output = codecs.open('categories.csv', 'w', encoding='utf-8')
fh = codecs.open(filename, 'r', encoding='utf-8')
-
- for line in fh:
- if line.startswith('INSERT INTO `categorylinks` VALUES ('):
- line = line.replace('INSERT INTO `categorylinks` VALUES (','')
- line = line.replace("'",'')
- categories = line.split('),(')
- for category in categories:
- category = category.split(',')
- if len(category) ==4:
- output.write('%s\t%s\n' % (category[0], category[1]))
-
+
+ try:
+ for line in fh:
+ if line.startswith('INSERT INTO `categorylinks` VALUES ('):
+ line = line.replace('INSERT INTO `categorylinks` VALUES (', '')
+ line = line.replace("'", '')
+ categories = line.split('),(')
+ for category in categories:
+ category = category.split(',')
+ if len(category) == 4:
+ output.write('%s\t%s\n' % (category[0], category[1]))
+ except UnicodeDecodeError, e:
+ print e
+
output.close()
fh.close()
+
def extract_revision_text(revision):
rev = revision.find('text')
if rev != None:
return rev.text.encode('utf-8')
else:
return None
-
+
+
def create_md5hash(revision):
- if revision == None:
- return False
rev = extract_revision_text(revision)
+ hash = {}
if rev != None:
m = hashlib.md5()
m.update(rev)
#echo m.digest()
- return m.hexdigest()
+ hash['hash'] = m.hexdigest()
else:
- return False
+ hash['hash'] = -1
+ return hash
-
+
def calculate_delta_article_size(prev_size, revision):
- if revision == None:
- return False
- rev= extract_revision_text(revision)
- if rev == None:
- return 0, prev_size
+ rev = extract_revision_text(revision)
+ size = {}
+ if prev_size == None:
+ size['prev_size'] = 0
+ size['cur_size'] = len(rev)
else:
+ size['prev_size'] = prev_size
delta = len(rev) - prev_size
prev_size = len(rev)
- return delta, prev_size
-
+ size['cur_size'] = delta
+ return size
-def create_variables(result_queue):
+def parse_contributor(contributor, bots):
+ username = extracter.extract_username(contributor)
+ user_id = extracter.extract_contributor_id(contributor)
+ bot = extracter.determine_username_is_bot(contributor, bots=bots)
+ contributor = {}
+ contributor['username'] = username
+ contributor['bot'] = bot
+ if user_id != None:
+ contributor.update(user_id)
+ else:
+ contribuor = False
+ return contributor
+
+
+def determine_namespace(title):
+ namespaces = {'User': 2,
+ 'Talk': 1,
+ 'User Talk': 3,
+ }
+ ns = {}
+ if title.text != None:
+ title = title.text
+ title = title.split(':')
+ if len(title) == 1:
+ ns['namespace'] = 0
+ elif len(title) == 2:
+ if title[0] in namespaces:
+ ns['namespace'] = namespaces[title[0]]
+ else:
+ ns = False #article does not belong to either the main
namespace, user, talk or user talk namespace.
+ else:
+ ns = False
+ return ns
+
+
+def prefill_row(title, article_id, namespace):
+ row = {}
+ row['title'] = title.text
+ row['article_id'] = article_id
+ row.update(namespace)
+ return row
+
+
+def is_revision_reverted(hash_cur, hashes):
+ revert = {}
+ if hash_cur in hashes:
+ revert['revert'] = 1
+ else:
+ revert['revert'] = 0
+ return revert
+
+
+def create_variables(result_queue, storage):
+ bots = detector.retrieve_bots('en')
+ buffer = Buffer(storage)
while True:
try:
article = result_queue.get(block=True)
@@ -94,26 +206,56 @@
if article == None:
break
article = fromstring(article)
- prev_size = 0
- revisions = article.findall('revision')
- for revision in revisions:
- revision_id = revision.find('id').text
- hash = create_md5hash(revision)
- delta, prev_size = calculate_delta_article_size(prev_size,
revision)
- print revision_id, hash, delta, prev_size
+ title = article.find('title')
+ namespace = determine_namespace(title)
+ if namespace != False:
+ prev_size = None
+ revisions = article.findall('revision')
+ article_id = article.find('id').text
+ hashes = deque(maxlen=100)
+ for revision in revisions:
+ row = prefill_row(title, article_id, namespace)
+ if revision == None:
+ continue
+
+ contributor = revision.find('contributor')
+ contributor = parse_contributor(contributor, bots)
+ if not contributor:
+ #editor is anonymous, ignore
+ continue
+
+ row.update(contributor)
+ revision_id = revision.find('id')
+ revision_id = extracter.extract_revision_id(revision_id)
+ row['revision_id'] = revision_id
+
+
+ hash = create_md5hash(revision)
+ revert = is_revision_reverted(hash['hash'], hashes)
+ hashes.append(hash['hash'])
+ size = calculate_delta_article_size(prev_size, revision)
+
+ row.update(hash)
+ row.update(size)
+ row.update(revert)
+ print row
+ # if row['username'] == None:
+ # contributor = revision.find('contributor')
+ # attrs = contributor.getchildren()
+ # for attr in attrs:
+ # print attr.text
+ #print revision_id, hash, delta, prev_size\
+
+ buffer.add(row)
+
except ValueError, e:
- pass
- #print e
-
+ print e
+ except UnicodeDecodeError, e:
+ print e
+ buffer.empty()
def create_article(input_queue, result_queue):
- '''
- This function creates three variables:
- 1) a MD5 hash for each revision
- 2) the size of the current revision
- 3) the delta size of the current revision and the previous revision
- '''
buffer = cStringIO.StringIO()
parsing = False
while True:
@@ -121,29 +263,30 @@
input_queue.task_done()
if filename == None:
break
-
+ filesize = file_utils.determine_filesize('', filename)
+ pbar = progressbar.ProgressBar(maxval=filesize).start()
for data in unzip(filename):
if data.startswith('<page>'):
parsing = True
- #print data
if parsing:
buffer.write(data)
if data == '</page>':
- xml1 = buffer.getvalue()
- #xml1 = xml1.decode('utf-8')
- #xml1 = xml1.encode('utf-8')
- #xml1 = fromstring(xml1)
- if xml1 != None:
- result_queue.put(xml1)
+ xml_string = buffer.getvalue()
+ if xml_string != None:
+ result_queue.put(xml_string)
buffer = cStringIO.StringIO()
-
- result_queue.put(None)
+ pbar.update(pbar.currval + len(data)) #is inaccurate!!!
+
+
+ result_queue.put(None)
print 'Finished parsing bz2 archives'
+
def unzip(filename):
'''
- Filename should be a fully qualified path to the bz2 file that will be
decompressed.
- It will iterate line by line and yield this back to create_article
+ Filename should be a fully qualified path to the bz2 file that will be
+ decompressed. It will iterate line by line and yield this back to
+ create_article
'''
fh = bz2.BZ2File(filename, 'r')
for line in fh:
@@ -156,27 +299,29 @@
def launcher():
input_queue = JoinableQueue()
result_queue = JoinableQueue()
- files =
['/Users/diederik/Downloads/enwiki-20110115-pages-articles1.xml.bz2']
+ storage = 'cassandra'
+ files =
['C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-pages-articles1.xml.bz2']
for file in files:
input_queue.put(file)
-
- for x in xrange(2):
+
+ for x in xrange(cpu_count):
input_queue.put(None)
-
- extracters = [Process(target=create_article, args=[input_queue,
result_queue]) for x in xrange(2)]
+
+ extracters = [Process(target=create_article, args=[input_queue,
result_queue])
+ for x in xrange(cpu_count)]
for extracter in extracters:
extracter.start()
-
- creators = [Process(target=create_variables, args=[result_queue]) for x in
xrange(2)]
+
+ creators = [Process(target=create_variables, args=[result_queue, storage])
+ for x in xrange(cpu_count)]
for creator in creators:
creator.start()
-
-
+
+
input_queue.join()
result_queue.join()
-
-
+
if __name__ == '__main__':
- extract_categories()
- #launcher()
\ No newline at end of file
+ #extract_categories()
+ launcher()
Modified: trunk/tools/editor_trends/etl/extracter.py
===================================================================
--- trunk/tools/editor_trends/etl/extracter.py 2011-02-24 23:13:06 UTC (rev
82761)
+++ trunk/tools/editor_trends/etl/extracter.py 2011-02-24 23:36:22 UTC (rev
82762)
@@ -153,6 +153,13 @@
return None
+def extract_revision_id(revision_id, **kwargs):
+ if revision_id != None:
+ return revision_id.text
+ else:
+ return None
+
+
def extract_contributor_id(contributor, **kwargs):
'''
@contributor is the xml contributor node containing a number of attributes
@@ -184,18 +191,19 @@
the variable tags determines which attributes are being parsed, the values
in this dictionary are the functions used to extract the data.
'''
- headers = ['id', 'date', 'article', 'username']
+ headers = ['id', 'date', 'article', 'username', 'revision_id']
tags = {'contributor': {'id': extract_contributor_id,
'bot': determine_username_is_bot,
'username': extract_username,
},
'timestamp': {'date': wikitree.parser.extract_text},
+ 'id': {'revision_id': extract_revision_id,
+ }
}
vars = {}
flat = []
for x, revision in enumerate(revisions):
- #print len(revision.getchildren())
vars[x] = {}
vars[x]['article'] = page
for tag in tags:
@@ -208,7 +216,6 @@
f = tags[tag][function]
value = f(el, bots=bots)
if isinstance(value, dict):
- #if type(value) == type({}):
for kw in value:
vars[x][kw] = value[kw]
else:
@@ -237,7 +244,7 @@
return output
-def parse_dumpfile(tasks, rts, filehandles, lock):
+def parse_dumpfile(tasks, rts, lock):
bot_ids = detector.retrieve_bots(rts.language.code)
location = os.path.join(rts.input_location, rts.language.code,
rts.project.name)
output = os.path.join(rts.input_location, rts.language.code,
rts.project.name, 'txt')
@@ -351,6 +358,7 @@
files = file_utils.retrieve_file_list(properties.location,
extension,
mask=canonical_filename)
+ print properties.location
print 'Checking if dump file has been extracted...'
for fn in files:
file_without_ext = fn.replace('%s%s' % ('.', extension), '')
@@ -369,7 +377,6 @@
print 'There was an error while extracting %s, please make sure \
that %s is valid archive.' % (fn, fn)
return False
- print tasks.qsize()
return tasks
@@ -392,14 +399,10 @@
return result
lock = multiprocessing.Lock()
-# filehandles = [file_utils.create_txt_filehandle(output, '%s.csv' % fh,
'a',
-# settings.encoding) for fh in xrange(settings.max_filehandles)]
- filehandles = []
consumers = [multiprocessing.Process(target=parse_dumpfile,
args=(tasks,
rts,
- filehandles,
lock))
for x in xrange(rts.number_of_processes)]
@@ -411,11 +414,11 @@
w.start()
tasks.join()
- filehandles = [fh.close() for fh in filehandles]
- result = sum([consumer.exitcode for consumer in consumers])
+ result = sum([consumer.exitcode for consumer in consumers
+ if consumer.exitcode != None])
- if restult == 0:
+ if result == 0:
return True
else:
return False
@@ -427,5 +430,6 @@
filename = 'svwiki-latest-stub-meta-history.xml'
parse_dumpfile(project, filename, language_code)
+
if __name__ == '__main__':
debug()
_______________________________________________
MediaWiki-CVS mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs