http://www.mediawiki.org/wiki/Special:Code/MediaWiki/76355

Revision: 76355
Author:   diederik
Date:     2010-11-08 23:52:50 +0000 (Mon, 08 Nov 2010)
Log Message:
-----------
Parallelized mergesort by distributing tasks over multiple cores. 

Modified Paths:
--------------
    trunk/tools/editor_trends/map_wiki_editors.py
    trunk/tools/editor_trends/utils/sort.py

Modified: trunk/tools/editor_trends/map_wiki_editors.py
===================================================================
--- trunk/tools/editor_trends/map_wiki_editors.py       2010-11-08 23:35:24 UTC 
(rev 76354)
+++ trunk/tools/editor_trends/map_wiki_editors.py       2010-11-08 23:52:50 UTC 
(rev 76355)
@@ -202,7 +202,7 @@
                         print 'Still sleeping, queue is %s items long' % 
output.qsize()
 
             else:
-                output.close()
+                fh.close()
 
             if pbar:
                 print file, xml_queue.qsize()

Modified: trunk/tools/editor_trends/utils/sort.py
===================================================================
--- trunk/tools/editor_trends/utils/sort.py     2010-11-08 23:35:24 UTC (rev 
76354)
+++ trunk/tools/editor_trends/utils/sort.py     2010-11-08 23:52:50 UTC (rev 
76355)
@@ -25,9 +25,12 @@
 '''
 
 import heapq
+from multiprocessing import Queue
+from Queue import Empty
 
 import settings
 import utils
+import process_constructor as pc
 from database import cache
 
 def quick_sort(obs):
@@ -110,7 +113,7 @@
     fh.close()
 
 
-def debug_merge_sorted_files(input, output):
+def merge_sorted_files(input, output):
     files = utils.retrieve_file_list(input, 'txt', mask='')
     filehandles = [utils.create_txt_filehandle(input, file, 'r', 
settings.ENCODING) for file in files]
     lines = merge_sorted_files(output, filehandles)
@@ -121,17 +124,50 @@
 def debug_mergesort(input, output):
     files = utils.retrieve_file_list(input, 'txt', mask='((?!_sorted)\d)')
     for file in files:
-        fh = utils.create_txt_filehandle(input, file, 'r', settings.ENCODING)
-        data = fh.readlines()
-        fh.close()
-        data = [d.replace('\n', '') for d in data]
-        data = [d.split('\t') for d in data]
-        sorted_data = mergesort(data)
-        write_sorted_file(sorted_data, file, output)
+        pass
+        
+def mergesort_feeder(input_queue, **kwargs):
+    input = kwargs.get('input', None)
+    output = kwargs.get('output', None)
+    while True:
+        try:
+            file = input_queue.get(block=False)
+            fh = utils.create_txt_filehandle(input, file, 'r', 
settings.ENCODING)
+            data = fh.readlines()
+            fh.close()
+            data = [d.replace('\n', '') for d in data]
+            data = [d.split('\t') for d in data]
+            sorted_data = mergesort(data)
+            write_sorted_file(sorted_data, file, output)
+        except Empty:
+            break
+        
 
 
+def mergesort_launcher(input, output):
+    kwargs = {'pbar': True,
+              'nr_input_processors': settings.NUMBER_OF_PROCESSES,
+              'nr_output_processors': settings.NUMBER_OF_PROCESSES,
+              'input': input,
+              'output': output,
+              }
+    chunks = {}
+    
+    files = utils.retrieve_file_list(input, 'txt')
+    parts = int(round(float(len(files)) / settings.NUMBER_OF_PROCESSES, 0))
+    a = 0
+    
+    for x in xrange(settings.NUMBER_OF_PROCESSES):
+        b = a + parts
+        chunks[x] = files[a:b]
+        a = (x + 1) * parts
+
+    pc.build_scaffolding(pc.load_queue, mergesort_feeder, chunks, False, 
False, **kwargs)
+    merge_sorted_files(input, output)
+
 if __name__ == '__main__':
-    input = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki')
+    input = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'txt')
     output = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'sorted')
-    debug_mergesort(input, output)
+    mergesort_launcher(input, output)
+    #debug_mergesort(input, output)
     #debug_merge_sorted_files(input, output)


_______________________________________________
MediaWiki-CVS mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs

Reply via email to