http://www.mediawiki.org/wiki/Special:Code/MediaWiki/84500

Revision: 84500
Author:   diederik
Date:     2011-03-21 23:28:54 +0000 (Mon, 21 Mar 2011)
Log Message:
-----------
Converted XML parser for Kaggle competition to streaming parser, hopefully no 
more out of memory errors. 

Modified Paths:
--------------
    trunk/tools/editor_trends/etl/enricher.py

Modified: trunk/tools/editor_trends/etl/enricher.py
===================================================================
--- trunk/tools/editor_trends/etl/enricher.py   2011-03-21 23:27:08 UTC (rev 
84499)
+++ trunk/tools/editor_trends/etl/enricher.py   2011-03-21 23:28:54 UTC (rev 
84500)
@@ -26,7 +26,7 @@
 import sys
 import progressbar
 from multiprocessing import JoinableQueue, Process, cpu_count, current_process
-from xml.etree.cElementTree import fromstring
+from xml.etree.cElementTree import fromstring, iterparse
 from collections import deque
 
 if '..' not in sys.path:
@@ -287,15 +287,15 @@
         if article == None:
             break
         i += 1
-        article = fromstring(article)
-        title = article.find('title')
+        #article = fromstring(article)
+        title = article['title'].text
         namespace = determine_namespace(title)
         if namespace != False:
-            revisions = article.findall('revision')
-            article_id = article.find('id').text
+            #revisions = article.findall('revision')
+            article_id = article['id'].text
             hashes = deque(maxlen=1000)
             size = {}
-            for revision in revisions:
+            for revision in article['revision']:
                 if revision == None:
                     #the entire revision is empty, weird. 
                     continue
@@ -348,34 +348,66 @@
     print 'Buffer is empty'
 
 
-def create_article(input_queue, result_queue):
+def parse_xml(source, result_queue):
+    context = iterparse(source, events=('end',))
+    context = iter(context)
+    event, root = context.next()
+
+    article = {}
+    id = False
+    for event, elem in context:
+        if event == 'end' and elem.tag == 'revision':
+            article[elem.tag] = elem
+        elif event == 'end' and elem.tag == 'id' and id == False:
+            article[elem.tag] = elem
+            id = True
+    article[root.tag] = root
+    result_queue.put(article)
+    root.clear()
+
+
+def stream_raw_xml(input_queue, result_queue):
     buffer = cStringIO.StringIO()
     parsing = False
+
     while True:
         filename = input_queue.get()
         input_queue.task_done()
         if filename == None:
             break
+
         #filesize = file_utils.determine_filesize('', filename)
         #pbar = progressbar.ProgressBar().start()
+
         for data in unzip(filename):
             if data.startswith('<page>'):
                 parsing = True
             if parsing:
                 buffer.write(data)
+                buffer.write('\n')
                 if data == '</page>':
-                    xml_string = buffer.getvalue()
-                    if xml_string != None:
-                        result_queue.put(xml_string)
+                    buffer.seek(0)
+                    parse_xml(buffer, result_queue)
                     buffer = cStringIO.StringIO()
                 #pbar.update(pbar.currval + len(data)) #is inaccurate!!!
 
 
-    #for x in xrange(cpu_count()):
-    result_queue.put(None)
+    for x in xrange(cpu_count()):
+        result_queue.put(None)
     print 'Finished parsing bz2 archives'
 
 
+def debug():
+    input_queue = JoinableQueue()
+    result_queue = JoinableQueue()
+    files = 
['C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-pages-articles1.xml.bz2']
+
+    for file in files:
+        input_queue.put(file)
+
+    stream_raw_xml(input_queue, result_queue)
+
+
 def unzip(filename):
     '''
     Filename should be a fully qualified path to the bz2 file that will be 
@@ -410,7 +442,7 @@
     for x in xrange(cpu_count()):
         input_queue.put(None)
 
-    extracters = [Process(target=create_article, args=[input_queue, 
result_queue])
+    extracters = [Process(target=stream_raw_xml, args=[input_queue, 
result_queue])
                   for x in xrange(cpu_count())]
     for extracter in extracters:
         extracter.start()
@@ -426,5 +458,5 @@
 
 
 if __name__ == '__main__':
-    #extract_categories()
+    #debug()
     launcher()


_______________________________________________
MediaWiki-CVS mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs

Reply via email to