#!/usr/bin/python
# -*- coding: utf-8 -*-
"""Small-data implementation of mapreduce.

Mapreduce is best known as a framework for efficient and
fault-tolerant massively-distributed big-data processing, such as
rebuilding Google’s search index.  By virtue of restricting the
dataflow, it gains fault-tolerance and almost-linear scalability.

However, it turns out that the Mapreduce abstraction has a couple of
other virtues as well.  Mapreduce algorithms are constrained to need
only sequential data access — at least, if you use a sorting algorithm
that only needs sequential data access, and the result sets for any
given intermediate key are bounded in size — so you can execute them
reasonably efficiently on data that’s much bigger than your RAM; and
there are also some algorithms whose expression is simpler and more
natural given mapreduce as a primitive.

This is a very simple implementation of mapreduce, not suitable for
large data, specifically intended for this last case: algorithms whose
expression is simplified by using mapreduce.

"""

import itertools
import re
import sys
import textwrap


def count_words_in_filename(filename):
    "Example #1 of mapreduce: count number of occurrences of each word."
    with open(filename) as fo:
        return count_words_in_file(fo)

def count_words_in_file(fo):
    "count_words_in_filename, but for a file object."
    return mapreduce(words_in_file(fo), lambda word: [(word, 1)], sum)

def words_in_file(fo):
    "Return an iterator of the words in a text file, for examples."
    return itertools.chain.from_iterable(words_in_line(line) for line in fo)


def concordance_of_filename(filename):
    "Example #2 of mapreduce: produce a concordance."
    with open(filename) as fo:
        return concordance_of_file(fo)

def concordance_of_file(fo):
    "concordance_of_filename, but for a file object."
    def mapfunc((line_no, line)):
        return ((word, line_no+1) for word in words_in_line(line))
    return mapreduce(enumerate(fo), mapfunc, list)


def words_in_line(line):
    "Return an iterator of the words in a line of text."
    return re.findall('\w+', line)


def sort_lines(filename):
    "Example #3 of mapreduce; kind of stupid without parallelism."
    with open(filename) as fo:
        return mapreduce(fo, lambda line: [(line, 1)], sum)


def mapreduce(iterable, mapfunc, reducefunc):
    """Performs a nonscalable mapreduce.

    Feeds each of the items from the iterable to mapfunc, which
    returns an iterable of key-value pairs. The key-value pairs are
    then grouped by key, and then reducefunc is called once for each
    group, with an iterable of the values for that group.  mapreduce
    returns an iterable that yields one key-value pair for each of
    these groups, sorted by key; the value is the value returned by
    reducefunc.

    """

    buckets = {}

    for item in iterable:
        for key, value in mapfunc(item):
            if key not in buckets:
                buckets[key] = []
            buckets[key].append(value)

    return ((key, reducefunc(buckets[key])) for key in sorted(buckets.keys()))


if __name__ == '__main__':
    for word, count in count_words_in_filename(sys.argv[1]):
        print "%12d" % count, word

    for word, line_nos in concordance_of_filename(sys.argv[1]):
        for line in textwrap.wrap(', '.join(map(str, line_nos)),
                                  initial_indent=word+': ',
                                  subsequent_indent='    '):
            print line
-- 
To unsubscribe: http://lists.canonical.org/mailman/listinfo/kragen-hacks

Reply via email to