http://www.mediawiki.org/wiki/Special:Code/MediaWiki/76355
Revision: 76355
Author: diederik
Date: 2010-11-08 23:52:50 +0000 (Mon, 08 Nov 2010)
Log Message:
-----------
Parallelized mergesort by distributing tasks over multiple cores.
Modified Paths:
--------------
trunk/tools/editor_trends/map_wiki_editors.py
trunk/tools/editor_trends/utils/sort.py
Modified: trunk/tools/editor_trends/map_wiki_editors.py
===================================================================
--- trunk/tools/editor_trends/map_wiki_editors.py 2010-11-08 23:35:24 UTC
(rev 76354)
+++ trunk/tools/editor_trends/map_wiki_editors.py 2010-11-08 23:52:50 UTC
(rev 76355)
@@ -202,7 +202,7 @@
print 'Still sleeping, queue is %s items long' %
output.qsize()
else:
- output.close()
+ fh.close()
if pbar:
print file, xml_queue.qsize()
Modified: trunk/tools/editor_trends/utils/sort.py
===================================================================
--- trunk/tools/editor_trends/utils/sort.py 2010-11-08 23:35:24 UTC (rev
76354)
+++ trunk/tools/editor_trends/utils/sort.py 2010-11-08 23:52:50 UTC (rev
76355)
@@ -25,9 +25,12 @@
'''
import heapq
+from multiprocessing import Queue
+from Queue import Empty
import settings
import utils
+import process_constructor as pc
from database import cache
def quick_sort(obs):
@@ -110,7 +113,7 @@
fh.close()
-def debug_merge_sorted_files(input, output):
+def merge_sorted_files(input, output):
files = utils.retrieve_file_list(input, 'txt', mask='')
filehandles = [utils.create_txt_filehandle(input, file, 'r',
settings.ENCODING) for file in files]
lines = merge_sorted_files(output, filehandles)
@@ -121,17 +124,50 @@
def debug_mergesort(input, output):
files = utils.retrieve_file_list(input, 'txt', mask='((?!_sorted)\d)')
for file in files:
- fh = utils.create_txt_filehandle(input, file, 'r', settings.ENCODING)
- data = fh.readlines()
- fh.close()
- data = [d.replace('\n', '') for d in data]
- data = [d.split('\t') for d in data]
- sorted_data = mergesort(data)
- write_sorted_file(sorted_data, file, output)
+ pass
+
+def mergesort_feeder(input_queue, **kwargs):
+ input = kwargs.get('input', None)
+ output = kwargs.get('output', None)
+ while True:
+ try:
+ file = input_queue.get(block=False)
+ fh = utils.create_txt_filehandle(input, file, 'r',
settings.ENCODING)
+ data = fh.readlines()
+ fh.close()
+ data = [d.replace('\n', '') for d in data]
+ data = [d.split('\t') for d in data]
+ sorted_data = mergesort(data)
+ write_sorted_file(sorted_data, file, output)
+ except Empty:
+ break
+
+def mergesort_launcher(input, output):
+ kwargs = {'pbar': True,
+ 'nr_input_processors': settings.NUMBER_OF_PROCESSES,
+ 'nr_output_processors': settings.NUMBER_OF_PROCESSES,
+ 'input': input,
+ 'output': output,
+ }
+ chunks = {}
+
+ files = utils.retrieve_file_list(input, 'txt')
+ parts = int(round(float(len(files)) / settings.NUMBER_OF_PROCESSES, 0))
+ a = 0
+
+ for x in xrange(settings.NUMBER_OF_PROCESSES):
+ b = a + parts
+ chunks[x] = files[a:b]
+ a = (x + 1) * parts
+
+ pc.build_scaffolding(pc.load_queue, mergesort_feeder, chunks, False,
False, **kwargs)
+ merge_sorted_files(input, output)
+
if __name__ == '__main__':
- input = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki')
+ input = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'txt')
output = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'sorted')
- debug_mergesort(input, output)
+ mergesort_launcher(input, output)
+ #debug_mergesort(input, output)
#debug_merge_sorted_files(input, output)
_______________________________________________
MediaWiki-CVS mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs