http://www.mediawiki.org/wiki/Special:Code/MediaWiki/76345

Revision: 76345
Author:   diederik
Date:     2010-11-08 22:12:49 +0000 (Mon, 08 Nov 2010)
Log Message:
-----------
Addded mergesort module. By presorting data, significant reductions in 
processing time are achieved. 

Modified Paths:
--------------
    trunk/tools/editor_trends/construct_datasets.py
    trunk/tools/editor_trends/database/cache.py
    trunk/tools/editor_trends/manage.py
    trunk/tools/editor_trends/map_wiki_editors.py
    trunk/tools/editor_trends/settings.py
    trunk/tools/editor_trends/utils/process_constructor.py
    trunk/tools/editor_trends/utils/utils.py

Added Paths:
-----------
    trunk/tools/editor_trends/utils/sort.py

Modified: trunk/tools/editor_trends/construct_datasets.py
===================================================================
--- trunk/tools/editor_trends/construct_datasets.py     2010-11-08 21:55:05 UTC 
(rev 76344)
+++ trunk/tools/editor_trends/construct_datasets.py     2010-11-08 22:12:49 UTC 
(rev 76345)
@@ -126,11 +126,11 @@
     pc.build_scaffolding(pc.load_queue, retrieve_edits_by_contributor, 
'contributors')
 
 
-def debug_retrieve_edits_by_contributor_launcher():
+def debug_retrieve_edits_by_contributor_launcher(dbname):
     kwargs = {'debug': False,
-              'dbname': 'enwiki',
+              'dbname': dbname,
               }
-    ids = retrieve_editor_ids_mongo('enwiki', 'editors')
+    ids = retrieve_editor_ids_mongo(dbname, 'editors')
     input_queue = pc.load_queue(ids)
     q = Queue()
     generate_editor_dataset(input_queue, q, False, kwargs)
@@ -159,7 +159,6 @@
 def generate_editor_dataset_debug(dbname):
     ids = retrieve_editor_ids_mongo(dbname, 'editors')
     input_queue = pc.load_queue(ids)
-    #write_dataset(input_queue, [], 'enwiki')
     kwargs = {'nr_input_processors': 1,
               'nr_output_processors': 1,
               'debug': True,

Modified: trunk/tools/editor_trends/database/cache.py
===================================================================
--- trunk/tools/editor_trends/database/cache.py 2010-11-08 21:55:05 UTC (rev 
76344)
+++ trunk/tools/editor_trends/database/cache.py 2010-11-08 22:12:49 UTC (rev 
76345)
@@ -86,25 +86,10 @@
 
             if self.editors[key]['obs'] == self.treshold:
                 self.treshold_editors.add(key)
-#            self.update(key, self.editors[key]['edits'])
-#            del self.editors[key]
-#            self.n -= 10
-#            self.number_editors -= 1
 
     def update(self, editor, values):
-        #t = datetime.datetime.now()
         self.collection.update({'editor': editor}, {'$pushAll': {'edits': 
values}}, upsert=True)
-        #print 'It took %s to store editor %s;and the cache contains %s 
editors and %s items' % (datetime.datetime.now() - t, editor, 
self.number_editors, self.n)
 
-    def quick_sort(self, obs):
-        if obs == []:
-            return []
-        else:
-            pivot = obs[0]
-            lesser = self.quick_sort([x for x in obs[1:] if x < pivot])
-            greater = self.quick_sort([x for x in obs[1:] if x >= pivot])
-            return lesser + [pivot] + greater
-
     def store(self):
         utils.store_object(self, settings.BINARY_OBJECT_FILE_LOCATION, 
self.__repr__())
 

Modified: trunk/tools/editor_trends/manage.py
===================================================================
--- trunk/tools/editor_trends/manage.py 2010-11-08 21:55:05 UTC (rev 76344)
+++ trunk/tools/editor_trends/manage.py 2010-11-08 22:12:49 UTC (rev 76345)
@@ -79,17 +79,19 @@
     return project
 
 
-def generate_wikidump_filename(args):
-    return '%s-%s-%s' % (retrieve_projectname(args), 'latest', get_value(args, 
'file'))
+def generate_wikidump_filename(project, args):
+    return '%s-%s-%s' % (project, 'latest', get_value(args, 'file'))
 
 
 def determine_file_locations(args):
     locations = {}
     location = get_value(args, 'location') if get_value(args, 'location') != 
None else settings.XML_FILE_LOCATION
-    locations['language_code'] = retrieve_language(args)
-    locations['location'] = os.path.join(location, retrieve_language(args))
+    project = retrieve_project(args)
+    language_code = retrieve_language(args)
+    locations['language_code'] = language_code
+    locations['location'] = os.path.join(location, language_code, project)
     locations['project'] = retrieve_projectname(args)
-    locations['filename'] = generate_wikidump_filename(args)
+    locations['filename'] = generate_wikidump_filename(project, args)
     return locations
 
 
@@ -189,6 +191,12 @@
         except UnicodeEncodeError:
             print '%s' % language
 
+
+def detect_python_version():
+    version = ''.join(sys.version_info[0:2])
+    if version < settings.MINIMUM_PYTHON_VERSION:
+        raise 'Please upgrade to Python 2.6 or higher (but not Python 3.x).'  
+
 def about():
     print 'Editor Trends Software is (c) 2010 by the Wikimedia Foundation.'
     print 'Written by Diederik van Liere ([email protected]).'
@@ -253,6 +261,7 @@
     parser.add_argument('-prog', '--progress', action='store_true', 
default=True,
                       help='Indicate whether you want to have a progressbar.')
 
+    detect_python_version()
     args = parser.parse_args()
     config.load_configuration(args)
     locations = determine_file_locations(args)

Modified: trunk/tools/editor_trends/map_wiki_editors.py
===================================================================
--- trunk/tools/editor_trends/map_wiki_editors.py       2010-11-08 21:55:05 UTC 
(rev 76344)
+++ trunk/tools/editor_trends/map_wiki_editors.py       2010-11-08 22:12:49 UTC 
(rev 76345)
@@ -88,20 +88,22 @@
                 return - 1
 
 
-def output_editor_information(elem, data_queue, **kwargs):
+def output_editor_information(elem, output, **kwargs):
     '''
     @elem is an XML element containing 1 revision from a page
-    @data_queue is where to store the data
+    @output is where to store the data, either a queue or a filehandle
     @**kwargs contains extra information 
     
     the variable tags determines which attributes are being parsed, the values 
in
     this dictionary are the functions used to extract the data. 
     '''
-    tags = {'contributor': {'editor': extract_contributor_id, 'bot': 
determine_username_is_bot},
+    tags = {'contributor': {'editor': extract_contributor_id,
+                            'bot': determine_username_is_bot},
             'timestamp': {'date': xml.extract_text},
             }
     vars = {}
-
+    headers = ['editor', 'date', 'article']
+    destination = kwargs.pop('destination')
     revisions = elem.findall('revision')
     for revision in revisions:
         vars['article'] = elem.find('id').text.decode(settings.ENCODING)
@@ -114,12 +116,19 @@
         #print '%s\t%s\t%s\t%s\t' % (vars['article'], vars['contributor'], 
vars['timestamp'], vars['bot'])
         if vars['bot'] == 0 and vars['editor'] != -1 and vars['editor'] != 
None:
             vars.pop('bot')
-            vars['date'] = utils.convert_timestamp_to_date(vars['date'])
-            data_queue.put(vars)
+            if destination == 'queue':
+                output.put(vars)
+                vars['date'] = utils.convert_timestamp_to_date(vars['date'])
+            elif destination == 'file':
+                data =[]
+                for head in headers:
+                    data.append(vars[head])
+                utils.write_list_to_csv(data, output)
+                output.write('\n')
         vars = {}
 
 
-def parse_editors(xml_queue, data_queue, pbar, bots, **kwargs):
+def parse_editors(xml_queue, output, pbar, bots, **kwargs):
     '''
     @xml_queue contains the filenames of the files to be parsed
     @data_queue is an instance of Queue where the extracted data is stored for 
@@ -130,8 +139,10 @@
     
     Output is the data_queue that will be used by store_editors() 
     '''
-    file_location = os.path.join(settings.XML_FILE_LOCATION, 
kwargs.get('language', 'en'))
-    debug = kwargs.get('debug', None)
+    file_location = os.path.join(settings.XML_FILE_LOCATION, 
kwargs.get('language', 'en'), kwargs.get('project', 'wiki'))
+    debug = kwargs.get('debug', False)
+    destination = kwargs.get('destination', 'file')
+    
     if settings.DEBUG:
         messages = {}
         vars = {}
@@ -145,9 +156,13 @@
             if file == None:
                 print 'Swallowed a poison pill'
                 break
+
             data = xml.read_input(utils.create_txt_filehandle(file_location,
                                                       file, 'r',
                                                       
encoding=settings.ENCODING))
+            if destination == 'file':
+                name = file[:-4] + '.txt'
+                output = utils.create_txt_filehandle(file_location, name, 'w', 
settings.ENCODING)
             for raw_data in data:
                 xml_buffer = cStringIO.StringIO()
                 raw_data.insert(0, '<?xml version="1.0" encoding="UTF-8" ?>\n')
@@ -156,7 +171,7 @@
                     raw_data = ''.join(raw_data)
                     xml_buffer.write(raw_data)
                     elem = cElementTree.XML(xml_buffer.getvalue())
-                    output_editor_information(elem, data_queue, bots=bots)
+                    output_editor_information(elem, output, bots=bots, 
destination=destination)
                 except SyntaxError, error:
                     print error
                     '''
@@ -176,26 +191,30 @@
                     print file, error
                     print raw_data[:12]
                     print 'String was supposed to be %s characters long' % 
sum([len(raw) for raw in raw_data])
+            if destination == 'queue':
+                output.put('NEXT')
+                while True:
+                    if output.qsize() < 100000:
+                        break
+                    else:
+                        time.sleep(10)
+                        print 'Still sleeping, queue is %s items long' % 
output.qsize()
 
-            data_queue.put('NEXT')
+            else:
+                output.close()
+
             if pbar:
-                print file, xml_queue.qsize(), data_queue.qsize()
+                print file, xml_queue.qsize()
                 #utils.update_progressbar(pbar, xml_queue)
+                
             if debug:
                 break
-
-            while True:
-                if data_queue.qsize() < 100000:
-                    break
-                else:
-                    time.sleep(10)
-                    print 'Still sleeping, queue is %s items long' % 
data_queue.qsize()
-
+            
         except Empty:
             break
 
-    #for x in xrange(4):
-    data_queue.put(None)
+    if destination == 'queue':
+        data_queue.put(None)
 
     if settings.DEBUG:
         utils.report_error_messages(messages, parse_editors)
@@ -263,9 +282,9 @@
         cache[c] = {}
         editor_cache.add('NEXT', '')
     cache = {}
-    
 
 
+
 def load_bot_ids():
     '''
     Loader function to retrieve list of id's of known Wikipedia bots. 
@@ -279,17 +298,20 @@
     return ids
 
 
-def run_parse_editors(dbname, language, location):
+def run_parse_editors(location, language, project):
     ids = load_bot_ids()
     kwargs = {'bots': ids,
-              'dbname': dbname,
+              'dbname': language + project,
+              'language': language,
+              'project': project,
               'pbar': True,
-              'nr_input_processors': 2,
-              'nr_output_processors': 2,
-              'language': language,
+              'destination': 'file',
+              'nr_input_processors': settings.NUMBER_OF_PROCESSES,
+              'nr_output_processors': settings.NUMBER_OF_PROCESSES,
               }
     chunks = {}
-    files = utils.retrieve_file_list(location, 'xml')
+    source = os.path.join(location, language, project)
+    files = utils.retrieve_file_list(source, 'xml')
     parts = int(round(float(len(files)) / settings.NUMBER_OF_PROCESSES, 0))
     a = 0
     for x in xrange(settings.NUMBER_OF_PROCESSES):
@@ -297,18 +319,18 @@
         chunks[x] = files[a:b]
         a = (x + 1) * parts
 
-    pc.build_scaffolding(pc.load_queue, parse_editors, chunks, store_editors, 
True, **kwargs)
-    search_cache_for_missed_editors(dbname)
+    pc.build_scaffolding(pc.load_queue, parse_editors, chunks, False, False, 
**kwargs)
+    #search_cache_for_missed_editors(dbname)
 
 
 def debug_parse_editors(dbname):
     q = JoinableQueue()
-    parse_editors('en\\522.xml', q, None, None, True)
+    parse_editors('522.xml', q, None, None, debug=True, destination='file')
     store_editors(q, [], dbname)
-    search_cache_for_missed_editors(dbname)
+    #search_cache_for_missed_editors(dbname)
 
 
 if __name__ == "__main__":
-    #debug_parse_editors('test')
-    run_parse_editors('test', 'en')
+    #debug_parse_editors('test2')
+    run_parse_editors(settings.XML_FILE_LOCATION, 'en', 'wiki')
     pass

Modified: trunk/tools/editor_trends/settings.py
===================================================================
--- trunk/tools/editor_trends/settings.py       2010-11-08 21:55:05 UTC (rev 
76344)
+++ trunk/tools/editor_trends/settings.py       2010-11-08 22:12:49 UTC (rev 
76345)
@@ -41,6 +41,7 @@
 IGNORE_DIRS = ['wikistats', 'zips']
 ROOT = '/' if OS != 'Windows' else 'c:\\'
 
+MINIMUM_PYTHON_VERSION = 2.6
 
 dirs = [name for name in os.listdir(WORKING_DIRECTORY) if
         os.path.isdir(os.path.join(WORKING_DIRECTORY, name))]

Modified: trunk/tools/editor_trends/utils/process_constructor.py
===================================================================
--- trunk/tools/editor_trends/utils/process_constructor.py      2010-11-08 
21:55:05 UTC (rev 76344)
+++ trunk/tools/editor_trends/utils/process_constructor.py      2010-11-08 
22:12:49 UTC (rev 76345)
@@ -57,6 +57,7 @@
     nr_output_processors = kwargs.pop('nr_output_processors')
     input_queues = {}
     result_queues = {}
+    
     #assert len(obj) == nr_input_processors
     #if result_queue:
     #    assert len(obj)== nr_output_processors

Added: trunk/tools/editor_trends/utils/sort.py
===================================================================
--- trunk/tools/editor_trends/utils/sort.py                             (rev 0)
+++ trunk/tools/editor_trends/utils/sort.py     2010-11-08 22:12:49 UTC (rev 
76345)
@@ -0,0 +1,119 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+
+'''
+Copyright (C) 2010 by Diederik van Liere ([email protected])
+This program is free software; you can redistribute it and/or
+modify it under the terms of the GNU General Public License version 2
+as published by the Free Software Foundation.
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+See the GNU General Public License for more details, at
+http://www.fsf.org/licenses/gpl.html
+'''
+
+__author__ = '''\n'''.join(['Diederik van Liere ([email protected])', ])
+__author__email = 'dvanliere at gmail dot com'
+__date__ = '2010-11-07'
+__version__ = '0.1'
+
+'''
+This module provides a small number of sorting algorithms including mergesort,
+external mergesort and quicksort. By presorting the data, considerably
+efficiency gains can be realized when inserting the data in MongoDB.
+'''
+
+import heapq
+
+import settings
+import utils
+
+def quick_sort(obs):
+    if obs == []:
+        return []
+    else:
+        pivot = obs[0]
+        lesser = quick_sort([x for x in obs[1:] if x < pivot])
+        greater = quick_sort([x for x in obs[1:] if x >= pivot])
+        return lesser + [pivot] + greater
+
+def mergesort(n):
+        """Recursively merge sort a list. Returns the sorted list."""
+        front = n[:len(n) / 2]
+        back = n[len(n) / 2:]
+
+        if len(front) > 1:
+                front = mergesort(front)
+        if len(back) > 1:
+                back = mergesort(back)
+
+        return merge(front, back)
+
+
+def merge(front, back):
+        """Merge two sorted lists together. Returns the merged list."""
+        result = []
+        while front and back:
+                # pick the smaller one from the front and stick it on
+                # note that list.pop(0) is a linear operation, so this gives 
quadratic running time...
+                result.append(front.pop(0) if front[0] <= back[0] else 
back.pop(0))
+        # add the remaining end
+        result.extend(front or back)
+        return result
+
+
+def readline(file):
+    for line in file:
+        if line == '':
+            continue
+        else:
+            line = line.replace('\n', '')
+            line = line.split('\t')
+            yield line
+
+
+def merge_sorted_files(output, files):
+    output = utils.create_txt_filehandle(output, 'merged.txt', 'w', 
settings.ENCODING)
+    lines = 0
+    for line in heapq.merge(*[readline(file) for file in files]):
+        output.write(line)
+        lines += 1
+    output.close()
+    return lines
+
+
+def write_sorted_file(sorted_data, file, output):
+    file = file.split('.')
+    file[0] = file[0] + '_sorted'
+    file = '.'.join(file)
+    fh = utils.create_txt_filehandle(output, file, 'w', settings.ENCODING)
+    utils.write_list_to_csv(sorted_data, fh)
+    fh.close()
+
+
+def debug_merge_sorted_files(input, output):
+    files = utils.retrieve_file_list(input, 'txt', mask='')
+    filehandles = [utils.create_txt_filehandle(input, file, 'r', 
settings.ENCODING) for file in files]
+    lines = merge_sorted_files(output, filehandles)
+    filehandles = [fh.close() for fh in filehandles]
+    print lines
+
+
+def debug_mergesort(input, output):
+    files = utils.retrieve_file_list(input, 'txt', mask='((?!_sorted)\d)')
+    for file in files:
+        fh = utils.create_txt_filehandle(input, file, 'r', settings.ENCODING)
+        data = fh.readlines()
+        fh.close()
+        data = [d.replace('\n', '') for d in data]
+        data = [d.split('\t') for d in data]
+        sorted_data = mergesort(data)
+        write_sorted_file(sorted_data, file, output)
+
+
+if __name__ == '__main__':
+    input = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki')
+    output = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'sorted')
+    debug_mergesort(input, output)
+    #debug_merge_sorted_files(input, output)


Property changes on: trunk/tools/editor_trends/utils/sort.py
___________________________________________________________________
Added: svn:eol-style
   + native

Modified: trunk/tools/editor_trends/utils/utils.py
===================================================================
--- trunk/tools/editor_trends/utils/utils.py    2010-11-08 21:55:05 UTC (rev 
76344)
+++ trunk/tools/editor_trends/utils/utils.py    2010-11-08 22:12:49 UTC (rev 
76345)
@@ -132,6 +132,11 @@
 
 # read / write data related functions
 def read_data_from_csv(filename, encoding):
+    '''
+    @filename is the path (either absolute or relative) including the name of
+    of the file
+    @encoding is usually utf-8 
+    '''
     if hasattr(filename, '__call__'):
         filename = construct_filename(filename)
 
@@ -156,6 +161,10 @@
 
 
 def determine_file_mode(extension):
+    '''
+    Checks if a given extension is an ASCII extension or not. The settings file
+    provides known ASCII extensions. 
+    '''
     if extension in settings.ASCII:
         return 'w'
     else:
@@ -163,15 +172,30 @@
 
 
 def write_list_to_csv(data, fh, recursive=False):
+    '''
+    @data is a list which can contain other lists that will be written as a
+    single line to a textfile
+    @fh is a handle to an open text
+    
+    The calling function is responsible for:
+        1) writing a newline
+        2) closing the filehandle
+    '''
+    tab = False
     if recursive:
         recursive = False
-    for d in data:
+    for x, d in enumerate(data):
+        if tab:
+            fh.write('\t')
         if type(d) == type([]):
             recursive = write_list_to_csv(d, fh, True)
         else:
-            fh.write('%s\t' % d)
+            fh.write('%s' % d)
+            tab = True
     if recursive:
+        tab = False
         return True
+    fh.write('\n')
 
     
 def write_dict_to_csv(data, fh):
@@ -267,31 +291,37 @@
 
 
 def create_dict_from_csv_file(filename, encoding):
+    '''
+    Constructs a dictionary from a txtfile
+    '''
     d = {}
     for line in read_data_from_csv(filename, encoding):
         line = clean_string(line)
         value, key = line.split('\t')
         d[key] = value
-
     return d
 
 
-def retrieve_file_list(location, extension, mask=''):
+def retrieve_file_list(location, extension, mask=None):
     '''
     Retrieve a list of files from a specified location.
     @location: either an absolute or relative path
     @extension: only include files with extension (optional)
-    @mask: only include files that start with mask (optional)
+    @mask: only include files that start with mask (optional), this is
+    interpreted as a regular expression. 
     
     @return: a list of files matching the criteria
     '''
+    if mask:
+        mask = re.compile(mask)
+    else:
+        mask = re.compile('[\w\d*]')
     all_files = os.listdir(location)
-    if not extension.startswith('.'):
-        extension = '.' + extension
     files = []
     for file in all_files:
-        if file.startswith(mask) and file.endswith(extension):
-            files.append(file)
+        file = file.split('.')
+        if re.match(mask, file[0]) and file[1].endswith(extension):
+            files.append('.'.join(file))
     return files
 
 


_______________________________________________
MediaWiki-CVS mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs

Reply via email to