http://www.mediawiki.org/wiki/Special:Code/MediaWiki/76345
Revision: 76345
Author: diederik
Date: 2010-11-08 22:12:49 +0000 (Mon, 08 Nov 2010)
Log Message:
-----------
Addded mergesort module. By presorting data, significant reductions in
processing time are achieved.
Modified Paths:
--------------
trunk/tools/editor_trends/construct_datasets.py
trunk/tools/editor_trends/database/cache.py
trunk/tools/editor_trends/manage.py
trunk/tools/editor_trends/map_wiki_editors.py
trunk/tools/editor_trends/settings.py
trunk/tools/editor_trends/utils/process_constructor.py
trunk/tools/editor_trends/utils/utils.py
Added Paths:
-----------
trunk/tools/editor_trends/utils/sort.py
Modified: trunk/tools/editor_trends/construct_datasets.py
===================================================================
--- trunk/tools/editor_trends/construct_datasets.py 2010-11-08 21:55:05 UTC
(rev 76344)
+++ trunk/tools/editor_trends/construct_datasets.py 2010-11-08 22:12:49 UTC
(rev 76345)
@@ -126,11 +126,11 @@
pc.build_scaffolding(pc.load_queue, retrieve_edits_by_contributor,
'contributors')
-def debug_retrieve_edits_by_contributor_launcher():
+def debug_retrieve_edits_by_contributor_launcher(dbname):
kwargs = {'debug': False,
- 'dbname': 'enwiki',
+ 'dbname': dbname,
}
- ids = retrieve_editor_ids_mongo('enwiki', 'editors')
+ ids = retrieve_editor_ids_mongo(dbname, 'editors')
input_queue = pc.load_queue(ids)
q = Queue()
generate_editor_dataset(input_queue, q, False, kwargs)
@@ -159,7 +159,6 @@
def generate_editor_dataset_debug(dbname):
ids = retrieve_editor_ids_mongo(dbname, 'editors')
input_queue = pc.load_queue(ids)
- #write_dataset(input_queue, [], 'enwiki')
kwargs = {'nr_input_processors': 1,
'nr_output_processors': 1,
'debug': True,
Modified: trunk/tools/editor_trends/database/cache.py
===================================================================
--- trunk/tools/editor_trends/database/cache.py 2010-11-08 21:55:05 UTC (rev
76344)
+++ trunk/tools/editor_trends/database/cache.py 2010-11-08 22:12:49 UTC (rev
76345)
@@ -86,25 +86,10 @@
if self.editors[key]['obs'] == self.treshold:
self.treshold_editors.add(key)
-# self.update(key, self.editors[key]['edits'])
-# del self.editors[key]
-# self.n -= 10
-# self.number_editors -= 1
def update(self, editor, values):
- #t = datetime.datetime.now()
self.collection.update({'editor': editor}, {'$pushAll': {'edits':
values}}, upsert=True)
- #print 'It took %s to store editor %s;and the cache contains %s
editors and %s items' % (datetime.datetime.now() - t, editor,
self.number_editors, self.n)
- def quick_sort(self, obs):
- if obs == []:
- return []
- else:
- pivot = obs[0]
- lesser = self.quick_sort([x for x in obs[1:] if x < pivot])
- greater = self.quick_sort([x for x in obs[1:] if x >= pivot])
- return lesser + [pivot] + greater
-
def store(self):
utils.store_object(self, settings.BINARY_OBJECT_FILE_LOCATION,
self.__repr__())
Modified: trunk/tools/editor_trends/manage.py
===================================================================
--- trunk/tools/editor_trends/manage.py 2010-11-08 21:55:05 UTC (rev 76344)
+++ trunk/tools/editor_trends/manage.py 2010-11-08 22:12:49 UTC (rev 76345)
@@ -79,17 +79,19 @@
return project
-def generate_wikidump_filename(args):
- return '%s-%s-%s' % (retrieve_projectname(args), 'latest', get_value(args,
'file'))
+def generate_wikidump_filename(project, args):
+ return '%s-%s-%s' % (project, 'latest', get_value(args, 'file'))
def determine_file_locations(args):
locations = {}
location = get_value(args, 'location') if get_value(args, 'location') !=
None else settings.XML_FILE_LOCATION
- locations['language_code'] = retrieve_language(args)
- locations['location'] = os.path.join(location, retrieve_language(args))
+ project = retrieve_project(args)
+ language_code = retrieve_language(args)
+ locations['language_code'] = language_code
+ locations['location'] = os.path.join(location, language_code, project)
locations['project'] = retrieve_projectname(args)
- locations['filename'] = generate_wikidump_filename(args)
+ locations['filename'] = generate_wikidump_filename(project, args)
return locations
@@ -189,6 +191,12 @@
except UnicodeEncodeError:
print '%s' % language
+
+def detect_python_version():
+ version = ''.join(sys.version_info[0:2])
+ if version < settings.MINIMUM_PYTHON_VERSION:
+ raise 'Please upgrade to Python 2.6 or higher (but not Python 3.x).'
+
def about():
print 'Editor Trends Software is (c) 2010 by the Wikimedia Foundation.'
print 'Written by Diederik van Liere ([email protected]).'
@@ -253,6 +261,7 @@
parser.add_argument('-prog', '--progress', action='store_true',
default=True,
help='Indicate whether you want to have a progressbar.')
+ detect_python_version()
args = parser.parse_args()
config.load_configuration(args)
locations = determine_file_locations(args)
Modified: trunk/tools/editor_trends/map_wiki_editors.py
===================================================================
--- trunk/tools/editor_trends/map_wiki_editors.py 2010-11-08 21:55:05 UTC
(rev 76344)
+++ trunk/tools/editor_trends/map_wiki_editors.py 2010-11-08 22:12:49 UTC
(rev 76345)
@@ -88,20 +88,22 @@
return - 1
-def output_editor_information(elem, data_queue, **kwargs):
+def output_editor_information(elem, output, **kwargs):
'''
@elem is an XML element containing 1 revision from a page
- @data_queue is where to store the data
+ @output is where to store the data, either a queue or a filehandle
@**kwargs contains extra information
the variable tags determines which attributes are being parsed, the values
in
this dictionary are the functions used to extract the data.
'''
- tags = {'contributor': {'editor': extract_contributor_id, 'bot':
determine_username_is_bot},
+ tags = {'contributor': {'editor': extract_contributor_id,
+ 'bot': determine_username_is_bot},
'timestamp': {'date': xml.extract_text},
}
vars = {}
-
+ headers = ['editor', 'date', 'article']
+ destination = kwargs.pop('destination')
revisions = elem.findall('revision')
for revision in revisions:
vars['article'] = elem.find('id').text.decode(settings.ENCODING)
@@ -114,12 +116,19 @@
#print '%s\t%s\t%s\t%s\t' % (vars['article'], vars['contributor'],
vars['timestamp'], vars['bot'])
if vars['bot'] == 0 and vars['editor'] != -1 and vars['editor'] !=
None:
vars.pop('bot')
- vars['date'] = utils.convert_timestamp_to_date(vars['date'])
- data_queue.put(vars)
+ if destination == 'queue':
+ output.put(vars)
+ vars['date'] = utils.convert_timestamp_to_date(vars['date'])
+ elif destination == 'file':
+ data =[]
+ for head in headers:
+ data.append(vars[head])
+ utils.write_list_to_csv(data, output)
+ output.write('\n')
vars = {}
-def parse_editors(xml_queue, data_queue, pbar, bots, **kwargs):
+def parse_editors(xml_queue, output, pbar, bots, **kwargs):
'''
@xml_queue contains the filenames of the files to be parsed
@data_queue is an instance of Queue where the extracted data is stored for
@@ -130,8 +139,10 @@
Output is the data_queue that will be used by store_editors()
'''
- file_location = os.path.join(settings.XML_FILE_LOCATION,
kwargs.get('language', 'en'))
- debug = kwargs.get('debug', None)
+ file_location = os.path.join(settings.XML_FILE_LOCATION,
kwargs.get('language', 'en'), kwargs.get('project', 'wiki'))
+ debug = kwargs.get('debug', False)
+ destination = kwargs.get('destination', 'file')
+
if settings.DEBUG:
messages = {}
vars = {}
@@ -145,9 +156,13 @@
if file == None:
print 'Swallowed a poison pill'
break
+
data = xml.read_input(utils.create_txt_filehandle(file_location,
file, 'r',
encoding=settings.ENCODING))
+ if destination == 'file':
+ name = file[:-4] + '.txt'
+ output = utils.create_txt_filehandle(file_location, name, 'w',
settings.ENCODING)
for raw_data in data:
xml_buffer = cStringIO.StringIO()
raw_data.insert(0, '<?xml version="1.0" encoding="UTF-8" ?>\n')
@@ -156,7 +171,7 @@
raw_data = ''.join(raw_data)
xml_buffer.write(raw_data)
elem = cElementTree.XML(xml_buffer.getvalue())
- output_editor_information(elem, data_queue, bots=bots)
+ output_editor_information(elem, output, bots=bots,
destination=destination)
except SyntaxError, error:
print error
'''
@@ -176,26 +191,30 @@
print file, error
print raw_data[:12]
print 'String was supposed to be %s characters long' %
sum([len(raw) for raw in raw_data])
+ if destination == 'queue':
+ output.put('NEXT')
+ while True:
+ if output.qsize() < 100000:
+ break
+ else:
+ time.sleep(10)
+ print 'Still sleeping, queue is %s items long' %
output.qsize()
- data_queue.put('NEXT')
+ else:
+ output.close()
+
if pbar:
- print file, xml_queue.qsize(), data_queue.qsize()
+ print file, xml_queue.qsize()
#utils.update_progressbar(pbar, xml_queue)
+
if debug:
break
-
- while True:
- if data_queue.qsize() < 100000:
- break
- else:
- time.sleep(10)
- print 'Still sleeping, queue is %s items long' %
data_queue.qsize()
-
+
except Empty:
break
- #for x in xrange(4):
- data_queue.put(None)
+ if destination == 'queue':
+ data_queue.put(None)
if settings.DEBUG:
utils.report_error_messages(messages, parse_editors)
@@ -263,9 +282,9 @@
cache[c] = {}
editor_cache.add('NEXT', '')
cache = {}
-
+
def load_bot_ids():
'''
Loader function to retrieve list of id's of known Wikipedia bots.
@@ -279,17 +298,20 @@
return ids
-def run_parse_editors(dbname, language, location):
+def run_parse_editors(location, language, project):
ids = load_bot_ids()
kwargs = {'bots': ids,
- 'dbname': dbname,
+ 'dbname': language + project,
+ 'language': language,
+ 'project': project,
'pbar': True,
- 'nr_input_processors': 2,
- 'nr_output_processors': 2,
- 'language': language,
+ 'destination': 'file',
+ 'nr_input_processors': settings.NUMBER_OF_PROCESSES,
+ 'nr_output_processors': settings.NUMBER_OF_PROCESSES,
}
chunks = {}
- files = utils.retrieve_file_list(location, 'xml')
+ source = os.path.join(location, language, project)
+ files = utils.retrieve_file_list(source, 'xml')
parts = int(round(float(len(files)) / settings.NUMBER_OF_PROCESSES, 0))
a = 0
for x in xrange(settings.NUMBER_OF_PROCESSES):
@@ -297,18 +319,18 @@
chunks[x] = files[a:b]
a = (x + 1) * parts
- pc.build_scaffolding(pc.load_queue, parse_editors, chunks, store_editors,
True, **kwargs)
- search_cache_for_missed_editors(dbname)
+ pc.build_scaffolding(pc.load_queue, parse_editors, chunks, False, False,
**kwargs)
+ #search_cache_for_missed_editors(dbname)
def debug_parse_editors(dbname):
q = JoinableQueue()
- parse_editors('en\\522.xml', q, None, None, True)
+ parse_editors('522.xml', q, None, None, debug=True, destination='file')
store_editors(q, [], dbname)
- search_cache_for_missed_editors(dbname)
+ #search_cache_for_missed_editors(dbname)
if __name__ == "__main__":
- #debug_parse_editors('test')
- run_parse_editors('test', 'en')
+ #debug_parse_editors('test2')
+ run_parse_editors(settings.XML_FILE_LOCATION, 'en', 'wiki')
pass
Modified: trunk/tools/editor_trends/settings.py
===================================================================
--- trunk/tools/editor_trends/settings.py 2010-11-08 21:55:05 UTC (rev
76344)
+++ trunk/tools/editor_trends/settings.py 2010-11-08 22:12:49 UTC (rev
76345)
@@ -41,6 +41,7 @@
IGNORE_DIRS = ['wikistats', 'zips']
ROOT = '/' if OS != 'Windows' else 'c:\\'
+MINIMUM_PYTHON_VERSION = 2.6
dirs = [name for name in os.listdir(WORKING_DIRECTORY) if
os.path.isdir(os.path.join(WORKING_DIRECTORY, name))]
Modified: trunk/tools/editor_trends/utils/process_constructor.py
===================================================================
--- trunk/tools/editor_trends/utils/process_constructor.py 2010-11-08
21:55:05 UTC (rev 76344)
+++ trunk/tools/editor_trends/utils/process_constructor.py 2010-11-08
22:12:49 UTC (rev 76345)
@@ -57,6 +57,7 @@
nr_output_processors = kwargs.pop('nr_output_processors')
input_queues = {}
result_queues = {}
+
#assert len(obj) == nr_input_processors
#if result_queue:
# assert len(obj)== nr_output_processors
Added: trunk/tools/editor_trends/utils/sort.py
===================================================================
--- trunk/tools/editor_trends/utils/sort.py (rev 0)
+++ trunk/tools/editor_trends/utils/sort.py 2010-11-08 22:12:49 UTC (rev
76345)
@@ -0,0 +1,119 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+
+'''
+Copyright (C) 2010 by Diederik van Liere ([email protected])
+This program is free software; you can redistribute it and/or
+modify it under the terms of the GNU General Public License version 2
+as published by the Free Software Foundation.
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+See the GNU General Public License for more details, at
+http://www.fsf.org/licenses/gpl.html
+'''
+
+__author__ = '''\n'''.join(['Diederik van Liere ([email protected])', ])
+__author__email = 'dvanliere at gmail dot com'
+__date__ = '2010-11-07'
+__version__ = '0.1'
+
+'''
+This module provides a small number of sorting algorithms including mergesort,
+external mergesort and quicksort. By presorting the data, considerably
+efficiency gains can be realized when inserting the data in MongoDB.
+'''
+
+import heapq
+
+import settings
+import utils
+
+def quick_sort(obs):
+ if obs == []:
+ return []
+ else:
+ pivot = obs[0]
+ lesser = quick_sort([x for x in obs[1:] if x < pivot])
+ greater = quick_sort([x for x in obs[1:] if x >= pivot])
+ return lesser + [pivot] + greater
+
+def mergesort(n):
+ """Recursively merge sort a list. Returns the sorted list."""
+ front = n[:len(n) / 2]
+ back = n[len(n) / 2:]
+
+ if len(front) > 1:
+ front = mergesort(front)
+ if len(back) > 1:
+ back = mergesort(back)
+
+ return merge(front, back)
+
+
+def merge(front, back):
+ """Merge two sorted lists together. Returns the merged list."""
+ result = []
+ while front and back:
+ # pick the smaller one from the front and stick it on
+ # note that list.pop(0) is a linear operation, so this gives
quadratic running time...
+ result.append(front.pop(0) if front[0] <= back[0] else
back.pop(0))
+ # add the remaining end
+ result.extend(front or back)
+ return result
+
+
+def readline(file):
+ for line in file:
+ if line == '':
+ continue
+ else:
+ line = line.replace('\n', '')
+ line = line.split('\t')
+ yield line
+
+
+def merge_sorted_files(output, files):
+ output = utils.create_txt_filehandle(output, 'merged.txt', 'w',
settings.ENCODING)
+ lines = 0
+ for line in heapq.merge(*[readline(file) for file in files]):
+ output.write(line)
+ lines += 1
+ output.close()
+ return lines
+
+
+def write_sorted_file(sorted_data, file, output):
+ file = file.split('.')
+ file[0] = file[0] + '_sorted'
+ file = '.'.join(file)
+ fh = utils.create_txt_filehandle(output, file, 'w', settings.ENCODING)
+ utils.write_list_to_csv(sorted_data, fh)
+ fh.close()
+
+
+def debug_merge_sorted_files(input, output):
+ files = utils.retrieve_file_list(input, 'txt', mask='')
+ filehandles = [utils.create_txt_filehandle(input, file, 'r',
settings.ENCODING) for file in files]
+ lines = merge_sorted_files(output, filehandles)
+ filehandles = [fh.close() for fh in filehandles]
+ print lines
+
+
+def debug_mergesort(input, output):
+ files = utils.retrieve_file_list(input, 'txt', mask='((?!_sorted)\d)')
+ for file in files:
+ fh = utils.create_txt_filehandle(input, file, 'r', settings.ENCODING)
+ data = fh.readlines()
+ fh.close()
+ data = [d.replace('\n', '') for d in data]
+ data = [d.split('\t') for d in data]
+ sorted_data = mergesort(data)
+ write_sorted_file(sorted_data, file, output)
+
+
+if __name__ == '__main__':
+ input = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki')
+ output = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'sorted')
+ debug_mergesort(input, output)
+ #debug_merge_sorted_files(input, output)
Property changes on: trunk/tools/editor_trends/utils/sort.py
___________________________________________________________________
Added: svn:eol-style
+ native
Modified: trunk/tools/editor_trends/utils/utils.py
===================================================================
--- trunk/tools/editor_trends/utils/utils.py 2010-11-08 21:55:05 UTC (rev
76344)
+++ trunk/tools/editor_trends/utils/utils.py 2010-11-08 22:12:49 UTC (rev
76345)
@@ -132,6 +132,11 @@
# read / write data related functions
def read_data_from_csv(filename, encoding):
+ '''
+ @filename is the path (either absolute or relative) including the name of
+ of the file
+ @encoding is usually utf-8
+ '''
if hasattr(filename, '__call__'):
filename = construct_filename(filename)
@@ -156,6 +161,10 @@
def determine_file_mode(extension):
+ '''
+ Checks if a given extension is an ASCII extension or not. The settings file
+ provides known ASCII extensions.
+ '''
if extension in settings.ASCII:
return 'w'
else:
@@ -163,15 +172,30 @@
def write_list_to_csv(data, fh, recursive=False):
+ '''
+ @data is a list which can contain other lists that will be written as a
+ single line to a textfile
+ @fh is a handle to an open text
+
+ The calling function is responsible for:
+ 1) writing a newline
+ 2) closing the filehandle
+ '''
+ tab = False
if recursive:
recursive = False
- for d in data:
+ for x, d in enumerate(data):
+ if tab:
+ fh.write('\t')
if type(d) == type([]):
recursive = write_list_to_csv(d, fh, True)
else:
- fh.write('%s\t' % d)
+ fh.write('%s' % d)
+ tab = True
if recursive:
+ tab = False
return True
+ fh.write('\n')
def write_dict_to_csv(data, fh):
@@ -267,31 +291,37 @@
def create_dict_from_csv_file(filename, encoding):
+ '''
+ Constructs a dictionary from a txtfile
+ '''
d = {}
for line in read_data_from_csv(filename, encoding):
line = clean_string(line)
value, key = line.split('\t')
d[key] = value
-
return d
-def retrieve_file_list(location, extension, mask=''):
+def retrieve_file_list(location, extension, mask=None):
'''
Retrieve a list of files from a specified location.
@location: either an absolute or relative path
@extension: only include files with extension (optional)
- @mask: only include files that start with mask (optional)
+ @mask: only include files that start with mask (optional), this is
+ interpreted as a regular expression.
@return: a list of files matching the criteria
'''
+ if mask:
+ mask = re.compile(mask)
+ else:
+ mask = re.compile('[\w\d*]')
all_files = os.listdir(location)
- if not extension.startswith('.'):
- extension = '.' + extension
files = []
for file in all_files:
- if file.startswith(mask) and file.endswith(extension):
- files.append(file)
+ file = file.split('.')
+ if re.match(mask, file[0]) and file[1].endswith(extension):
+ files.append('.'.join(file))
return files
_______________________________________________
MediaWiki-CVS mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs