http://www.mediawiki.org/wiki/Special:Code/MediaWiki/84500
Revision: 84500
Author: diederik
Date: 2011-03-21 23:28:54 +0000 (Mon, 21 Mar 2011)
Log Message:
-----------
Converted XML parser for Kaggle competition to streaming parser, hopefully no
more out of memory errors.
Modified Paths:
--------------
trunk/tools/editor_trends/etl/enricher.py
Modified: trunk/tools/editor_trends/etl/enricher.py
===================================================================
--- trunk/tools/editor_trends/etl/enricher.py 2011-03-21 23:27:08 UTC (rev
84499)
+++ trunk/tools/editor_trends/etl/enricher.py 2011-03-21 23:28:54 UTC (rev
84500)
@@ -26,7 +26,7 @@
import sys
import progressbar
from multiprocessing import JoinableQueue, Process, cpu_count, current_process
-from xml.etree.cElementTree import fromstring
+from xml.etree.cElementTree import fromstring, iterparse
from collections import deque
if '..' not in sys.path:
@@ -287,15 +287,15 @@
if article == None:
break
i += 1
- article = fromstring(article)
- title = article.find('title')
+ #article = fromstring(article)
+ title = article['title'].text
namespace = determine_namespace(title)
if namespace != False:
- revisions = article.findall('revision')
- article_id = article.find('id').text
+ #revisions = article.findall('revision')
+ article_id = article['id'].text
hashes = deque(maxlen=1000)
size = {}
- for revision in revisions:
+ for revision in article['revision']:
if revision == None:
#the entire revision is empty, weird.
continue
@@ -348,34 +348,66 @@
print 'Buffer is empty'
-def create_article(input_queue, result_queue):
+def parse_xml(source, result_queue):
+ context = iterparse(source, events=('end',))
+ context = iter(context)
+ event, root = context.next()
+
+ article = {}
+ id = False
+ for event, elem in context:
+ if event == 'end' and elem.tag == 'revision':
+ article[elem.tag] = elem
+ elif event == 'end' and elem.tag == 'id' and id == False:
+ article[elem.tag] = elem
+ id = True
+ article[root.tag] = root
+ result_queue.put(article)
+ root.clear()
+
+
+def stream_raw_xml(input_queue, result_queue):
buffer = cStringIO.StringIO()
parsing = False
+
while True:
filename = input_queue.get()
input_queue.task_done()
if filename == None:
break
+
#filesize = file_utils.determine_filesize('', filename)
#pbar = progressbar.ProgressBar().start()
+
for data in unzip(filename):
if data.startswith('<page>'):
parsing = True
if parsing:
buffer.write(data)
+ buffer.write('\n')
if data == '</page>':
- xml_string = buffer.getvalue()
- if xml_string != None:
- result_queue.put(xml_string)
+ buffer.seek(0)
+ parse_xml(buffer, result_queue)
buffer = cStringIO.StringIO()
#pbar.update(pbar.currval + len(data)) #is inaccurate!!!
- #for x in xrange(cpu_count()):
- result_queue.put(None)
+ for x in xrange(cpu_count()):
+ result_queue.put(None)
print 'Finished parsing bz2 archives'
+def debug():
+ input_queue = JoinableQueue()
+ result_queue = JoinableQueue()
+ files =
['C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-pages-articles1.xml.bz2']
+
+ for file in files:
+ input_queue.put(file)
+
+ stream_raw_xml(input_queue, result_queue)
+
+
def unzip(filename):
'''
Filename should be a fully qualified path to the bz2 file that will be
@@ -410,7 +442,7 @@
for x in xrange(cpu_count()):
input_queue.put(None)
- extracters = [Process(target=create_article, args=[input_queue,
result_queue])
+ extracters = [Process(target=stream_raw_xml, args=[input_queue,
result_queue])
for x in xrange(cpu_count())]
for extracter in extracters:
extracter.start()
@@ -426,5 +458,5 @@
if __name__ == '__main__':
- #extract_categories()
+ #debug()
launcher()
_______________________________________________
MediaWiki-CVS mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs