http://www.mediawiki.org/wiki/Special:Code/MediaWiki/84633

Revision: 84633
Author:   diederik
Date:     2011-03-23 21:38:31 +0000 (Wed, 23 Mar 2011)
Log Message:
-----------
Kaggle dataset creator almost finished.

Modified Paths:
--------------
    trunk/tools/editor_trends/etl/enricher.py

Modified: trunk/tools/editor_trends/etl/enricher.py
===================================================================
--- trunk/tools/editor_trends/etl/enricher.py   2011-03-23 21:21:31 UTC (rev 
84632)
+++ trunk/tools/editor_trends/etl/enricher.py   2011-03-23 21:38:31 UTC (rev 
84633)
@@ -19,6 +19,7 @@
 
 
 import bz2
+import os
 import cStringIO
 import hashlib
 import codecs
@@ -27,6 +28,7 @@
 import progressbar
 from multiprocessing import JoinableQueue, Process, cpu_count, current_process
 from xml.etree.cElementTree import fromstring, iterparse
+from lxml import objectify
 from collections import deque
 
 if '..' not in sys.path:
@@ -75,7 +77,16 @@
     #109:'Book talk'
 }
 
+class Statistics:
+    def __init__(self):
+        self.count_articles = 0
+        self.count_revisions = 0
 
+    def summary(self):
+        print 'Number of articles: %s' % self.count_articles
+        print 'Number of revisions: %s' % self.count_revisions
+
+
 class Buffer:
     def __init__(self, storage, id):
         assert storage == 'cassandra' or storage == 'mongo' or storage == 
'csv', \
@@ -89,6 +100,7 @@
                      'title', 'timestamp', 'hash', 'revert', 'bot', 
'prev_size',
                      'cur_size', 'delta']
         self.setup_storage()
+        self.stats = Statistics()
 
     def setup_storage(self):
         if self.storage == 'cassandra':
@@ -111,7 +123,8 @@
         self.stringify(revision)
         id = revision['revision_id']
         self.revisions[id] = revision
-        if len(self.revisions) == 1000:
+        if len(self.revisions) > 1000:
+            print 'Emptying buffer'
             self.store()
             self.clear()
 
@@ -250,10 +263,10 @@
             if title.startswith(namespace):
                 ns['namespace'] = namespaces[namespace]
         if ns == {}:
-            for namespace in NAMESPACE:
+            for namespace in NAMESPACE.values():
                 if title.startswith(namespace):
                     ns = False #article does not belong to either the main 
namespace, user, talk or user talk namespace.
-                    break
+                    return ns
             ns['namespace'] = 0
     else:
         ns = False
@@ -262,7 +275,7 @@
 
 def prefill_row(title, article_id, namespace):
     row = {}
-    row['title'] = title.text
+    row['title'] = title
     row['article_id'] = article_id
     row.update(namespace)
     return row
@@ -277,108 +290,93 @@
     return revert
 
 
-def create_variables(result_queue, storage, id):
-    bots = detector.retrieve_bots('en')
-    buffer = Buffer(storage, id)
-    i = 0
-    while True:
-        article = result_queue.get(block=True)
-        result_queue.task_done()
-        if article == None:
-            break
-        i += 1
-        #article = fromstring(article)
-        title = article['title'].text
-        namespace = determine_namespace(title)
-        if namespace != False:
-            #revisions = article.findall('revision')
-            article_id = article['id'].text
-            hashes = deque(maxlen=1000)
-            size = {}
-            for revision in article['revision']:
-                if revision == None:
-                    #the entire revision is empty, weird. 
-                    continue
+def add_comment(revision_id, revision):
+    comment = {}
+    comment[revision_id] = revision.text
+    return comment
+    
 
-                contributor = revision.find('contributor')
-                contributor = parse_contributor(contributor, bots)
-                if not contributor:
-                    #editor is anonymous, ignore
-                    continue
+def create_variables(article, cache, cache_comments, bots):
 
-                revision_id = revision.find('id')
-                revision_id = extracter.extract_revision_id(revision_id)
-                if revision_id == None:
-                    #revision_id is missing, which is weird
-                    continue
+    title = article['title'].text
+    namespace = determine_namespace(article['title'])
+    
+    if namespace != False:
+        cache.stats.count_articles += 1
+        article_id = article['id'].text
+        hashes = deque()
+        size = {}
+        revisions = article['revisions']
+        for revision in revisions:
+            cache.stats.count_revisions += 1
+            if revision == None:
+                #the entire revision is empty, weird. 
+                continue
+            contributor = revision.find('contributor')
+            contributor = parse_contributor(contributor, bots)
+            if not contributor:
+                #editor is anonymous, ignore
+                continue
 
-                row = prefill_row(title, article_id, namespace)
-                row['revision_id'] = revision_id
-                text = extract_revision_text(revision)
-                row.update(contributor)
+            revision_id = revision.find('id')
+            revision_id = extracter.extract_revision_id(revision_id)
+            if revision_id == None:
+                #revision_id is missing, which is weird
+                continue
+            comment = add_comment(revision_id, revision)
+            row = prefill_row(title, article_id, namespace)
+            row['revision_id'] = revision_id
+            text = extract_revision_text(revision)
+            row.update(contributor)
 
+            timestamp = revision.find('timestamp').text
+            row['timestamp'] = timestamp
 
-                timestamp = revision.find('timestamp').text
-                row['timestamp'] = timestamp
+            hash = create_md5hash(text)
+            revert = is_revision_reverted(hash['hash'], hashes)
+            hashes.append(hash['hash'])
+            size = calculate_delta_article_size(size, text)
 
-                hash = create_md5hash(text)
-                revert = is_revision_reverted(hash['hash'], hashes)
-                hashes.append(hash['hash'])
-                size = calculate_delta_article_size(size, text)
+            row.update(hash)
+            row.update(size)
+            row.update(revert)
+            cache.add(row)
+    
 
-                row.update(hash)
-                row.update(size)
-                row.update(revert)
-    #           print row
-    #           if row['username'] == None:
-    #               contributor = revision.find('contributor')
-    #               attrs = contributor.getchildren()
-    #               for attr in attrs:
-    #                   print attr.text
-                #print revision_id, hash, delta, prev_size\
 
-                buffer.add(row)
-        if i % 10000 == 0:
-            print 'Parsed %s articles' % i
-#        except ValueError, e:
-#            print e
-#        except UnicodeDecodeError, e:
-#            print e
-    buffer.empty()
-    print 'Buffer is empty'
 
-
-def parse_xml(source, result_queue):
-    context = iterparse(source, events=('end',))
+def parse_xml(buffer):
+    context = iterparse(buffer, events=('end',))
     context = iter(context)
     event, root = context.next()
 
     article = {}
     id = False
+    article[root.tag] = root
+    article['revisions'] = []
     for event, elem in context:
         if event == 'end' and elem.tag == 'revision':
-            article[elem.tag] = elem
+            article['revisions'].append(elem)
         elif event == 'end' and elem.tag == 'id' and id == False:
             article[elem.tag] = elem
             id = True
-    article[root.tag] = root
-    result_queue.put(article)
-    root.clear()
 
+    return article
 
-def stream_raw_xml(input_queue, result_queue):
+
+def stream_raw_xml(input_queue, storage, id):
     buffer = cStringIO.StringIO()
     parsing = False
-
+    bots = detector.retrieve_bots('en')
+    cache = Buffer(storage, id)
+    cache_comments = Buffer(storage, id)
+    i = 0
     while True:
         filename = input_queue.get()
         input_queue.task_done()
         if filename == None:
             break
 
-        #filesize = file_utils.determine_filesize('', filename)
-        #pbar = progressbar.ProgressBar().start()
-
         for data in unzip(filename):
             if data.startswith('<page>'):
                 parsing = True
@@ -387,14 +385,19 @@
                 buffer.write('\n')
                 if data == '</page>':
                     buffer.seek(0)
-                    parse_xml(buffer, result_queue)
+                    article = parse_xml(buffer)
+                    create_variables(article, cache, cache_comments, bots)
                     buffer = cStringIO.StringIO()
-                #pbar.update(pbar.currval + len(data)) #is inaccurate!!!
+                i += 1
+                if i % 10000 == 0:
+                    print 'Parsed %s articles' % i
 
-
-    for x in xrange(cpu_count()):
-        result_queue.put(None)
+               
+    cache.empty()
+    cache_comments.empty()
+    print 'Buffer is empty'
     print 'Finished parsing bz2 archives'
+    cache.stats.summary()
 
 
 def debug():
@@ -428,33 +431,34 @@
 
 
 def launcher():
-
     storage = 'csv'
     setup(storage)
     input_queue = JoinableQueue()
-    result_queue = JoinableQueue()
     #files = 
['C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-pages-articles1.xml.bz2']
-    files = 
['/home/diederik/kaggle/enwiki-20100904-pages-meta-history2.xml.bz2']
+    #files = 
['/home/diederik/kaggle/enwiki-20100904-pages-meta-history2.xml.bz2']
+    path = 
'/media/77fc623f-78c1-4f1e-be57-0f3043d778aa/wikipedia_dumps/batch1/'
+    files = file_utils.retrieve_file_list(path, 'bz2', mask=None)
 
     for file in files:
-        input_queue.put(file)
+        filename = os.path.join(path, file)
+        print filename
+        input_queue.put(filename)
 
     for x in xrange(cpu_count()):
         input_queue.put(None)
 
-    extracters = [Process(target=stream_raw_xml, args=[input_queue, 
result_queue])
+    extracters = [Process(target=stream_raw_xml, args=[input_queue, storage, 
x])
                   for x in xrange(cpu_count())]
     for extracter in extracters:
         extracter.start()
 
-    creators = [Process(target=create_variables, args=[result_queue, storage, 
x])
-                for x in xrange(cpu_count())]
-    for creator in creators:
-        creator.start()
+    #creators = [Process(target=create_variables, args=[result_queue, storage, 
x])
+    #            for x in xrange(cpu_count())]
+    #for creator in creators:
+    #    creator.start()
 
 
     input_queue.join()
-    result_queue.join()
 
 
 if __name__ == '__main__':


_______________________________________________
MediaWiki-CVS mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs

Reply via email to