http://www.mediawiki.org/wiki/Special:Code/MediaWiki/84633
Revision: 84633
Author: diederik
Date: 2011-03-23 21:38:31 +0000 (Wed, 23 Mar 2011)
Log Message:
-----------
Kaggle dataset creator almost finished.
Modified Paths:
--------------
trunk/tools/editor_trends/etl/enricher.py
Modified: trunk/tools/editor_trends/etl/enricher.py
===================================================================
--- trunk/tools/editor_trends/etl/enricher.py 2011-03-23 21:21:31 UTC (rev
84632)
+++ trunk/tools/editor_trends/etl/enricher.py 2011-03-23 21:38:31 UTC (rev
84633)
@@ -19,6 +19,7 @@
import bz2
+import os
import cStringIO
import hashlib
import codecs
@@ -27,6 +28,7 @@
import progressbar
from multiprocessing import JoinableQueue, Process, cpu_count, current_process
from xml.etree.cElementTree import fromstring, iterparse
+from lxml import objectify
from collections import deque
if '..' not in sys.path:
@@ -75,7 +77,16 @@
#109:'Book talk'
}
+class Statistics:
+ def __init__(self):
+ self.count_articles = 0
+ self.count_revisions = 0
+ def summary(self):
+ print 'Number of articles: %s' % self.count_articles
+ print 'Number of revisions: %s' % self.count_revisions
+
+
class Buffer:
def __init__(self, storage, id):
assert storage == 'cassandra' or storage == 'mongo' or storage ==
'csv', \
@@ -89,6 +100,7 @@
'title', 'timestamp', 'hash', 'revert', 'bot',
'prev_size',
'cur_size', 'delta']
self.setup_storage()
+ self.stats = Statistics()
def setup_storage(self):
if self.storage == 'cassandra':
@@ -111,7 +123,8 @@
self.stringify(revision)
id = revision['revision_id']
self.revisions[id] = revision
- if len(self.revisions) == 1000:
+ if len(self.revisions) > 1000:
+ print 'Emptying buffer'
self.store()
self.clear()
@@ -250,10 +263,10 @@
if title.startswith(namespace):
ns['namespace'] = namespaces[namespace]
if ns == {}:
- for namespace in NAMESPACE:
+ for namespace in NAMESPACE.values():
if title.startswith(namespace):
ns = False #article does not belong to either the main
namespace, user, talk or user talk namespace.
- break
+ return ns
ns['namespace'] = 0
else:
ns = False
@@ -262,7 +275,7 @@
def prefill_row(title, article_id, namespace):
row = {}
- row['title'] = title.text
+ row['title'] = title
row['article_id'] = article_id
row.update(namespace)
return row
@@ -277,108 +290,93 @@
return revert
-def create_variables(result_queue, storage, id):
- bots = detector.retrieve_bots('en')
- buffer = Buffer(storage, id)
- i = 0
- while True:
- article = result_queue.get(block=True)
- result_queue.task_done()
- if article == None:
- break
- i += 1
- #article = fromstring(article)
- title = article['title'].text
- namespace = determine_namespace(title)
- if namespace != False:
- #revisions = article.findall('revision')
- article_id = article['id'].text
- hashes = deque(maxlen=1000)
- size = {}
- for revision in article['revision']:
- if revision == None:
- #the entire revision is empty, weird.
- continue
+def add_comment(revision_id, revision):
+ comment = {}
+ comment[revision_id] = revision.text
+ return comment
+
- contributor = revision.find('contributor')
- contributor = parse_contributor(contributor, bots)
- if not contributor:
- #editor is anonymous, ignore
- continue
+def create_variables(article, cache, cache_comments, bots):
- revision_id = revision.find('id')
- revision_id = extracter.extract_revision_id(revision_id)
- if revision_id == None:
- #revision_id is missing, which is weird
- continue
+ title = article['title'].text
+ namespace = determine_namespace(article['title'])
+
+ if namespace != False:
+ cache.stats.count_articles += 1
+ article_id = article['id'].text
+ hashes = deque()
+ size = {}
+ revisions = article['revisions']
+ for revision in revisions:
+ cache.stats.count_revisions += 1
+ if revision == None:
+ #the entire revision is empty, weird.
+ continue
+ contributor = revision.find('contributor')
+ contributor = parse_contributor(contributor, bots)
+ if not contributor:
+ #editor is anonymous, ignore
+ continue
- row = prefill_row(title, article_id, namespace)
- row['revision_id'] = revision_id
- text = extract_revision_text(revision)
- row.update(contributor)
+ revision_id = revision.find('id')
+ revision_id = extracter.extract_revision_id(revision_id)
+ if revision_id == None:
+ #revision_id is missing, which is weird
+ continue
+ comment = add_comment(revision_id, revision)
+ row = prefill_row(title, article_id, namespace)
+ row['revision_id'] = revision_id
+ text = extract_revision_text(revision)
+ row.update(contributor)
+ timestamp = revision.find('timestamp').text
+ row['timestamp'] = timestamp
- timestamp = revision.find('timestamp').text
- row['timestamp'] = timestamp
+ hash = create_md5hash(text)
+ revert = is_revision_reverted(hash['hash'], hashes)
+ hashes.append(hash['hash'])
+ size = calculate_delta_article_size(size, text)
- hash = create_md5hash(text)
- revert = is_revision_reverted(hash['hash'], hashes)
- hashes.append(hash['hash'])
- size = calculate_delta_article_size(size, text)
+ row.update(hash)
+ row.update(size)
+ row.update(revert)
+ cache.add(row)
+
- row.update(hash)
- row.update(size)
- row.update(revert)
- # print row
- # if row['username'] == None:
- # contributor = revision.find('contributor')
- # attrs = contributor.getchildren()
- # for attr in attrs:
- # print attr.text
- #print revision_id, hash, delta, prev_size\
- buffer.add(row)
- if i % 10000 == 0:
- print 'Parsed %s articles' % i
-# except ValueError, e:
-# print e
-# except UnicodeDecodeError, e:
-# print e
- buffer.empty()
- print 'Buffer is empty'
-
-def parse_xml(source, result_queue):
- context = iterparse(source, events=('end',))
+def parse_xml(buffer):
+ context = iterparse(buffer, events=('end',))
context = iter(context)
event, root = context.next()
article = {}
id = False
+ article[root.tag] = root
+ article['revisions'] = []
for event, elem in context:
if event == 'end' and elem.tag == 'revision':
- article[elem.tag] = elem
+ article['revisions'].append(elem)
elif event == 'end' and elem.tag == 'id' and id == False:
article[elem.tag] = elem
id = True
- article[root.tag] = root
- result_queue.put(article)
- root.clear()
+ return article
-def stream_raw_xml(input_queue, result_queue):
+
+def stream_raw_xml(input_queue, storage, id):
buffer = cStringIO.StringIO()
parsing = False
-
+ bots = detector.retrieve_bots('en')
+ cache = Buffer(storage, id)
+ cache_comments = Buffer(storage, id)
+ i = 0
while True:
filename = input_queue.get()
input_queue.task_done()
if filename == None:
break
- #filesize = file_utils.determine_filesize('', filename)
- #pbar = progressbar.ProgressBar().start()
-
for data in unzip(filename):
if data.startswith('<page>'):
parsing = True
@@ -387,14 +385,19 @@
buffer.write('\n')
if data == '</page>':
buffer.seek(0)
- parse_xml(buffer, result_queue)
+ article = parse_xml(buffer)
+ create_variables(article, cache, cache_comments, bots)
buffer = cStringIO.StringIO()
- #pbar.update(pbar.currval + len(data)) #is inaccurate!!!
+ i += 1
+ if i % 10000 == 0:
+ print 'Parsed %s articles' % i
-
- for x in xrange(cpu_count()):
- result_queue.put(None)
+
+ cache.empty()
+ cache_comments.empty()
+ print 'Buffer is empty'
print 'Finished parsing bz2 archives'
+ cache.stats.summary()
def debug():
@@ -428,33 +431,34 @@
def launcher():
-
storage = 'csv'
setup(storage)
input_queue = JoinableQueue()
- result_queue = JoinableQueue()
#files =
['C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-pages-articles1.xml.bz2']
- files =
['/home/diederik/kaggle/enwiki-20100904-pages-meta-history2.xml.bz2']
+ #files =
['/home/diederik/kaggle/enwiki-20100904-pages-meta-history2.xml.bz2']
+ path =
'/media/77fc623f-78c1-4f1e-be57-0f3043d778aa/wikipedia_dumps/batch1/'
+ files = file_utils.retrieve_file_list(path, 'bz2', mask=None)
for file in files:
- input_queue.put(file)
+ filename = os.path.join(path, file)
+ print filename
+ input_queue.put(filename)
for x in xrange(cpu_count()):
input_queue.put(None)
- extracters = [Process(target=stream_raw_xml, args=[input_queue,
result_queue])
+ extracters = [Process(target=stream_raw_xml, args=[input_queue, storage,
x])
for x in xrange(cpu_count())]
for extracter in extracters:
extracter.start()
- creators = [Process(target=create_variables, args=[result_queue, storage,
x])
- for x in xrange(cpu_count())]
- for creator in creators:
- creator.start()
+ #creators = [Process(target=create_variables, args=[result_queue, storage,
x])
+ # for x in xrange(cpu_count())]
+ #for creator in creators:
+ # creator.start()
input_queue.join()
- result_queue.join()
if __name__ == '__main__':
_______________________________________________
MediaWiki-CVS mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs