I wrote this on 2008-04-20, but hadn't gotten around to sending it out
until now.  It represents an approach to hardware architecture that's
basically fallen by the wayside, but it's interesting to think about,
even if nobody's found a way to make it practical yet.

Like everything else posted to kragen-hacks without any notice to the
contrary, this program is in the public domain; I abandon any
copyright in it.

#!/usr/bin/python
"""
Simulate a "dynamic dataflow" machine.

My understanding of this comes from
<http://thesz.mskhug.ru/browser/hiersort/doc/sdd2.pdf?format=raw>,
"Dynamic Dataflow Architecture with Token Sorting" and [some lecture
slides from Slovenia](http://csd.ijs.si/courses/dataflow/index.htm
Dataflow Architectures, Jurij Silc).  (Although I read some of the
background afterwards, including Ellen Spertus's thesis and a little
of the work of the other students under Arvind in the 1980s, and it
seems to have been more or less right.)

The basic idea, as I understand it, is as follows.  Every computation
has two arguments, and produces zero or more "message sends".  The
message sends consist of a tag, a destination port, and a datum.  Memory
consists of a bunch of stored message sends.  At each step, memory is
searched for two stored messages with the same tag.  Their data become
the two arguments to a new computation, which also has access to the
tag, and they are removed from memory.

To the memory, the tag can be just an opaque bitstring, but the
execution unit uses the tag to figure out what code to run in the new
computation.  But maybe only some of the tag bits matter for selecting
the code; the other tag bits are available to the computation, though.

The idea is that you can run a lot of these computation steps in
parallel, because you can fire off a new computation any time both of
its arguments become available.  The idea is *not* that this is some
kind of improvement in expressiveness or clarity of your program code.

Zefirov, Stepanov, and Klimov's paper gives this example matrix
multiply state machine:

    group G {i,j,k} { // indices i,j, and k represent the tag
                       // and are implicit inside group.
        node M (x,y) {
            send (x*y) to S.y;
        }
        node S (x,y) {
            if (k<N)
                 send (x+y) to S.x {i,j,k+1};
            else
                 send (x+y) to (array C receiver) {i,j};
        }
    }

They write:

> To start this program we ought to send zeroes to S.x {1..N,1..N,1},
> A[i][k] to M.x{i,1..N,k} and B[k][j] to M.y{1..N,k,j}. This is a
> task in itself, it requires O(N^3) sends. We will discuss it further
> below.

In the above, the capital letters and the indices in {} are part of
the tag, while the .x or .y is the "destination port".

How should that look in Python?  You could imagine phrasing

    send (x+y) to S.x {i,j,k+1};

as

    S[i, j, k+1].x = x + y

Then you could rewrite the above as

    node M {i,j,k} (x,y): S[i, j, k].y = x * y
    node S {i,j,k} (x,y):
        if k < N: S[i, j, k+1].x = x+y
        else: C[i, j] = x+y

But the "node" lines are a problem.

There's no way in Python to write a new kind of subroutine, nor to
write a normal lambda, nor to implicitly bind a name within a scope.
But you could write this:

    class M(node):
        def run((i, j, k), x, y): S[i, j, k].y = x * y
    class S(node):
        def run((i, j, k), x, y):
            if k < N: S[i, j, k+1].x = x+y
            else: C[i, j] = x+y

Or, with decorator syntax, this:

    @node
    def M((i, j, k), x, y): S[i, j, k].y = x * y
    @node
    def S((i, j, k), x, y):
        if k < N: S[i, j, k+1].x = x+y
        else: C[i, j] = x+y

So how can we make that work?

"""

class Machine:
    def __init__(self):
        self.memory = {}
        self.tasks = []
    def run(self):
        (node, indices), args = self.tasks.pop()
        self.message(("running", node, indices, args))
        node.run(indices, args)
    def retrieve(self, tag):
        "Destructive retrieve by tag."
        rv = self.memory[tag]
        del self.memory[tag]
        return rv
    def send(self, tag, argname, value):
        try: old_argname, old_value = self.retrieve(tag)
        except KeyError: self.memory[tag] = argname, value
        else:
            assert argname != old_argname
            self.postpone(tag, {argname: value, old_argname: old_value})
    def postpone(self, tag, data): self.tasks.append((tag, data))
    def message(self, message): print message

class Node:
    def __init__(self, machine, function, argnames, name = '(anon)'):
        self.machine = machine
        self.function = function
        assert len(argnames) == 2, argnames
        self.argnames = argnames
        self.name = name
    def __repr__(self): return '<node %s>' % self.name
    def __getitem__(self, indices): return NodeReceiver(self, indices)
    def assign(self, indices, argname, value):
        assert argname in self.argnames, (argname, self.argnames)
        self.machine.send((self, indices), argname, value)
    def run(self, indices, args):
        arglist = [indices] + [args[argname] for argname in self.argnames]
        self.function(*arglist)

class NodeReceiver:
    def __init__(self, node, indices):
        self.__dict__['node'], self.__dict__['indices'] = node, indices
    def __setattr__(self, attr, value):
        self.node.assign(self.indices, attr, value)

machine = Machine()

def magically_get_argument_names(function):
    code = function.func_code
    return code.co_varnames[:code.co_argcount]

def node(function):
    "A decorator."
    return Node(machine, function, magically_get_argument_names(function)[1:],
                name = function.func_name)

def test():
    "Regression test."

    @node
    def M((i, j, k), x, y): S[i, j, k].g = x * y

    @node
    def S((i, j, k), f, g):
        if k < N: S[i, j, k+1].f = f+g
        else: C[i, j] = f+g

    C = {}
    N = 4
    S[3, 4, 4].f = 3
    S[3, 4, 4].g = 4
    machine.run()
    assert machine.memory == {}
    assert machine.tasks == []
    assert C == {(3, 4): 7}

    S[1, 2, 4].f = 1
    M[1, 2, 4].x = 3
    M[1, 2, 4].y = 4
    machine.run()
    machine.run()
    assert C[1, 2] == 13

if __name__ == '__main__': test()


Reply via email to