http://www.mediawiki.org/wiki/Special:Code/MediaWiki/82762

Revision: 82762
Author:   diederik
Date:     2011-02-24 23:36:22 +0000 (Thu, 24 Feb 2011)
Log Message:
-----------
Generic XML parser that creates variables and can output to different databases 
(Cassandra and Mongo). 

Modified Paths:
--------------
    trunk/tools/editor_trends/etl/enricher.py
    trunk/tools/editor_trends/etl/extracter.py

Modified: trunk/tools/editor_trends/etl/enricher.py
===================================================================
--- trunk/tools/editor_trends/etl/enricher.py   2011-02-24 23:13:06 UTC (rev 
82761)
+++ trunk/tools/editor_trends/etl/enricher.py   2011-02-24 23:36:22 UTC (rev 
82762)
@@ -23,12 +23,64 @@
 import hashlib
 import codecs
 import re
-from multiprocessing import JoinableQueue, Process
-#from xml.etree.cElementTree. import iterparse
-from xml.etree.cElementTree import fromstring
+import sys
+import progressbar
+from multiprocessing import JoinableQueue, Process, cpu_count
+from xml.etree.cElementTree import fromstring, dump
+from collections import deque
 
-RE_CATEGORY= re.compile('\(.*\`\,\.\-\:\'\)')
+try:
+    import pycassa
+except ImportError:
+    print 'I am not going to use Cassandra today, it\'s my off day.'
 
+if '..' not in sys.path:
+    sys.path.append('..')
+
+from database import db
+from bots import detector
+from utils import file_utils
+import extracter
+
+try:
+    import psyco
+    psyco.full()
+except ImportError:
+    print 'and psyco is having an off day as well...'
+
+RE_CATEGORY = re.compile('\(.*\`\,\.\-\:\'\)')
+
+
+class Buffer:
+    def __init__(self, storage):
+        assert storage == 'cassandra' or storage == 'mongo', \
+            'Valid storage options are cassandra and mongo.'
+        self.storage = storage
+        self.revisions = []
+        self.setup_db()
+
+    def setup_db(self):
+        if self.storage == 'cassandra':
+            pass
+        else:
+            self.db = db.init_mongo_db('enwiki')
+            self.collection = self.db['kaggle']
+
+    def add(self, revision):
+        self.revisions.append(revision)
+        if len(self.revisions) == 100:
+            self.store()
+
+    def empty(self):
+        self.store()
+
+    def store(self):
+        if self.storage == 'cassandra':
+            print 'insert into cassandra'
+        else:
+            print 'insert into mongo'
+
+
 def extract_categories():
     '''
     Field 1: page id
@@ -36,57 +88,117 @@
     Field 3: sort key
     Field 4: timestamp last change
     '''
-    filename = '/Users/diederik/Downloads/enwiki-20110115-categorylinks.sql'
+    filename = 
'C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-categorylinks.sql'
     output = codecs.open('categories.csv', 'w', encoding='utf-8')
     fh = codecs.open(filename, 'r', encoding='utf-8')
-    
-    for line in fh:
-        if line.startswith('INSERT INTO `categorylinks` VALUES ('):
-            line = line.replace('INSERT INTO `categorylinks` VALUES (','')
-            line = line.replace("'",'')
-            categories = line.split('),(')
-            for category in categories:
-                category = category.split(',')
-                if len(category) ==4:
-                    output.write('%s\t%s\n' % (category[0], category[1]))
-    
+
+    try:
+        for line in fh:
+            if line.startswith('INSERT INTO `categorylinks` VALUES ('):
+                line = line.replace('INSERT INTO `categorylinks` VALUES (', '')
+                line = line.replace("'", '')
+                categories = line.split('),(')
+                for category in categories:
+                    category = category.split(',')
+                    if len(category) == 4:
+                        output.write('%s\t%s\n' % (category[0], category[1]))
+    except UnicodeDecodeError, e:
+        print e
+
     output.close()
     fh.close()
 
+
 def extract_revision_text(revision):
     rev = revision.find('text')
     if rev != None:
         return rev.text.encode('utf-8')
     else:
         return None
-    
+
+
 def create_md5hash(revision):
-    if revision == None:
-        return False
     rev = extract_revision_text(revision)
+    hash = {}
     if rev != None:
         m = hashlib.md5()
         m.update(rev)
         #echo m.digest()
-        return m.hexdigest()
+        hash['hash'] = m.hexdigest()
     else:
-        return False
+        hash['hash'] = -1
+    return hash
 
-    
+
 def calculate_delta_article_size(prev_size, revision):
-    if revision == None:
-        return False
-    rev= extract_revision_text(revision)
-    if rev == None:
-        return 0, prev_size
+    rev = extract_revision_text(revision)
+    size = {}
+    if prev_size == None:
+        size['prev_size'] = 0
+        size['cur_size'] = len(rev)
     else:
+        size['prev_size'] = prev_size
         delta = len(rev) - prev_size
         prev_size = len(rev)
-        return delta, prev_size
-        
+        size['cur_size'] = delta
+    return size
 
 
-def create_variables(result_queue):
+def parse_contributor(contributor, bots):
+    username = extracter.extract_username(contributor)
+    user_id = extracter.extract_contributor_id(contributor)
+    bot = extracter.determine_username_is_bot(contributor, bots=bots)
+    contributor = {}
+    contributor['username'] = username
+    contributor['bot'] = bot
+    if user_id != None:
+        contributor.update(user_id)
+    else:
+        contribuor = False
+    return contributor
+
+
+def determine_namespace(title):
+    namespaces = {'User': 2,
+                  'Talk': 1,
+                  'User Talk': 3,
+                  }
+    ns = {}
+    if title.text != None:
+        title = title.text
+        title = title.split(':')
+        if len(title) == 1:
+           ns['namespace'] = 0
+        elif len(title) == 2:
+            if title[0] in namespaces:
+                ns['namespace'] = namespaces[title[0]]
+            else:
+                ns = False #article does not belong to either the main 
namespace, user, talk or user talk namespace.
+    else:
+        ns = False
+    return ns
+
+
+def prefill_row(title, article_id, namespace):
+    row = {}
+    row['title'] = title.text
+    row['article_id'] = article_id
+    row.update(namespace)
+    return row
+
+
+def is_revision_reverted(hash_cur, hashes):
+    revert = {}
+    if hash_cur in hashes:
+        revert['revert'] = 1
+    else:
+        revert['revert'] = 0
+    return revert
+
+
+def create_variables(result_queue, storage):
+    bots = detector.retrieve_bots('en')
+    buffer = Buffer(storage)
     while True:
         try:
             article = result_queue.get(block=True)
@@ -94,26 +206,56 @@
             if article == None:
                 break
             article = fromstring(article)
-            prev_size = 0
-            revisions = article.findall('revision')
-            for revision in revisions:
-                revision_id = revision.find('id').text
-                hash = create_md5hash(revision)
-                delta, prev_size = calculate_delta_article_size(prev_size, 
revision)
-                print revision_id, hash, delta, prev_size
+            title = article.find('title')
+            namespace = determine_namespace(title)
+            if namespace != False:
+                prev_size = None
+                revisions = article.findall('revision')
+                article_id = article.find('id').text
+                hashes = deque(maxlen=100)
+                for revision in revisions:
+                    row = prefill_row(title, article_id, namespace)
+                    if revision == None:
+                        continue
+
+                    contributor = revision.find('contributor')
+                    contributor = parse_contributor(contributor, bots)
+                    if not contributor:
+                        #editor is anonymous, ignore
+                        continue
+
+                    row.update(contributor)
+                    revision_id = revision.find('id')
+                    revision_id = extracter.extract_revision_id(revision_id)
+                    row['revision_id'] = revision_id
+
+
+                    hash = create_md5hash(revision)
+                    revert = is_revision_reverted(hash['hash'], hashes)
+                    hashes.append(hash['hash'])
+                    size = calculate_delta_article_size(prev_size, revision)
+
+                    row.update(hash)
+                    row.update(size)
+                    row.update(revert)
+                    print row
+    #                if row['username'] == None:
+    #                    contributor = revision.find('contributor')
+    #                    attrs = contributor.getchildren()
+    #                    for attr in attrs:
+    #                        print attr.text
+                    #print revision_id, hash, delta, prev_size\
+
+                    buffer.add(row)
+
         except ValueError, e:
-            pass
-            #print e
-                
+            print e
+        except UnicodeDecodeError, e:
+            print e
+    buffer.empty()
 
 
 def create_article(input_queue, result_queue):
-    '''
-    This function creates three variables:
-    1) a MD5 hash for each revision
-    2) the size of the current revision
-    3) the delta size of the current revision and the previous revision
-    '''
     buffer = cStringIO.StringIO()
     parsing = False
     while True:
@@ -121,29 +263,30 @@
         input_queue.task_done()
         if filename == None:
             break
-
+        filesize = file_utils.determine_filesize('', filename)
+        pbar = progressbar.ProgressBar(maxval=filesize).start()
         for data in unzip(filename):
             if data.startswith('<page>'):
                 parsing = True
-            #print data
             if parsing:
                 buffer.write(data)
                 if data == '</page>':
-                    xml1 = buffer.getvalue()
-                    #xml1 = xml1.decode('utf-8')
-                    #xml1 = xml1.encode('utf-8')
-                    #xml1 = fromstring(xml1)
-                    if xml1 != None:
-                        result_queue.put(xml1)
+                    xml_string = buffer.getvalue()
+                    if xml_string != None:
+                        result_queue.put(xml_string)
                     buffer = cStringIO.StringIO()
-    
-    result_queue.put(None)                
+                pbar.update(pbar.currval + len(data)) #is inaccurate!!!
+
+
+    result_queue.put(None)
     print 'Finished parsing bz2 archives'
 
+
 def unzip(filename):
     '''
-    Filename should be a fully qualified path to the bz2 file that will be 
decompressed.
-    It will iterate line by line and yield this back to create_article
+    Filename should be a fully qualified path to the bz2 file that will be 
+    decompressed. It will iterate line by line and yield this back to 
+    create_article
     '''
     fh = bz2.BZ2File(filename, 'r')
     for line in fh:
@@ -156,27 +299,29 @@
 def launcher():
     input_queue = JoinableQueue()
     result_queue = JoinableQueue()
-    files = 
['/Users/diederik/Downloads/enwiki-20110115-pages-articles1.xml.bz2']
+    storage = 'cassandra'
+    files = 
['C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-pages-articles1.xml.bz2']
     for file in files:
         input_queue.put(file)
-    
-    for x in xrange(2):
+
+    for x in xrange(cpu_count):
         input_queue.put(None)
-    
-    extracters = [Process(target=create_article, args=[input_queue, 
result_queue]) for x in xrange(2)]
+
+    extracters = [Process(target=create_article, args=[input_queue, 
result_queue])
+                  for x in xrange(cpu_count)]
     for extracter in extracters:
         extracter.start()
-    
-    creators = [Process(target=create_variables, args=[result_queue]) for x in 
xrange(2)]
+
+    creators = [Process(target=create_variables, args=[result_queue, storage])
+                for x in xrange(cpu_count)]
     for creator in creators:
         creator.start()
-    
-    
+
+
     input_queue.join()
     result_queue.join()
-    
-    
 
+
 if __name__ == '__main__':
-    extract_categories()
-    #launcher()
\ No newline at end of file
+    #extract_categories()
+    launcher()

Modified: trunk/tools/editor_trends/etl/extracter.py
===================================================================
--- trunk/tools/editor_trends/etl/extracter.py  2011-02-24 23:13:06 UTC (rev 
82761)
+++ trunk/tools/editor_trends/etl/extracter.py  2011-02-24 23:36:22 UTC (rev 
82762)
@@ -153,6 +153,13 @@
         return None
 
 
+def extract_revision_id(revision_id, **kwargs):
+    if revision_id != None:
+        return revision_id.text
+    else:
+        return None
+
+
 def extract_contributor_id(contributor, **kwargs):
     '''
     @contributor is the xml contributor node containing a number of attributes
@@ -184,18 +191,19 @@
     the variable tags determines which attributes are being parsed, the values 
     in this dictionary are the functions used to extract the data. 
     '''
-    headers = ['id', 'date', 'article', 'username']
+    headers = ['id', 'date', 'article', 'username', 'revision_id']
     tags = {'contributor': {'id': extract_contributor_id,
                             'bot': determine_username_is_bot,
                             'username': extract_username,
                             },
             'timestamp': {'date': wikitree.parser.extract_text},
+            'id': {'revision_id': extract_revision_id,
+                   }
             }
     vars = {}
     flat = []
 
     for x, revision in enumerate(revisions):
-        #print len(revision.getchildren())
         vars[x] = {}
         vars[x]['article'] = page
         for tag in tags:
@@ -208,7 +216,6 @@
                 f = tags[tag][function]
                 value = f(el, bots=bots)
                 if isinstance(value, dict):
-                #if type(value) == type({}):
                     for kw in value:
                         vars[x][kw] = value[kw]
                 else:
@@ -237,7 +244,7 @@
     return output
 
 
-def parse_dumpfile(tasks, rts, filehandles, lock):
+def parse_dumpfile(tasks, rts, lock):
     bot_ids = detector.retrieve_bots(rts.language.code)
     location = os.path.join(rts.input_location, rts.language.code, 
rts.project.name)
     output = os.path.join(rts.input_location, rts.language.code, 
rts.project.name, 'txt')
@@ -351,6 +358,7 @@
     files = file_utils.retrieve_file_list(properties.location,
                                      extension,
                                      mask=canonical_filename)
+    print properties.location
     print 'Checking if dump file has been extracted...'
     for fn in files:
         file_without_ext = fn.replace('%s%s' % ('.', extension), '')
@@ -369,7 +377,6 @@
             print 'There was an error while extracting %s, please make sure \
             that %s is valid archive.' % (fn, fn)
             return False
-    print tasks.qsize()
     return tasks
 
 
@@ -392,14 +399,10 @@
         return result
 
     lock = multiprocessing.Lock()
-#    filehandles = [file_utils.create_txt_filehandle(output, '%s.csv' % fh, 
'a',
-#                settings.encoding) for fh in xrange(settings.max_filehandles)]
 
-    filehandles = []
     consumers = [multiprocessing.Process(target=parse_dumpfile,
                                 args=(tasks,
                                       rts,
-                                      filehandles,
                                       lock))
                                 for x in xrange(rts.number_of_processes)]
 
@@ -411,11 +414,11 @@
         w.start()
 
     tasks.join()
-    filehandles = [fh.close() for fh in filehandles]
 
-    result = sum([consumer.exitcode for consumer in consumers])
+    result = sum([consumer.exitcode for consumer in consumers
+                  if consumer.exitcode != None])
 
-    if restult == 0:
+    if result == 0:
         return True
     else:
         return False
@@ -427,5 +430,6 @@
     filename = 'svwiki-latest-stub-meta-history.xml'
     parse_dumpfile(project, filename, language_code)
 
+
 if __name__ == '__main__':
     debug()


_______________________________________________
MediaWiki-CVS mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs

Reply via email to