Hi all,

I'd just like to let everyone know a new feature in taskflow (that I think will 
be benefical to various projects (reducing the duplication of similar code in 
various projects that accomplish the same feature set). The new feature is an 
ability to run tasks in remote-workers (the task transitions and state 
persistence is still done in an 'orchestrating' engine). This means that the 
engine no longer has to run tasks locally or in threads (or greenthreads) but 
can run tasks on remote machines (anything that can be connected to a MQ via 
kombu; TBD when this becomes oslo.messaging).


A simple example that might show how this works better for folks that have some 
time to try it out.


---------

Pre-setup: git clone the taskflow repo and install it (in a venv or elsewhere), 
install a mq server (rabbitmq for example).

---------


Lets now create two basic tasks (one that says hello and one that says goodbye).


class HelloWorldTask(task.Task):

    default_provides = "hi_happened"


    def execute(self):

        LOG.info('hello world')

        return time.time()


class GoodbyeWorldTask(task.Task):

    default_provides = "goodbye_happened"


    def execute(self, hi_happened):

        LOG.info('goodbye world (hi said on %s)', hi_happened)

        return time.time()


* Notice how the GoodbyeWorldTask requires an input 'hi_happened' (which is 
produced by the HelloWorldTask).


Now lets create a workflow that combines these two together.


f = linear_flow.Flow("hi-bye")

f.add(HelloWorldTask())

f.add(GoodbyeWorldTask())


Notice here that we have specified a linear runtime order (that is hello will 
be said before goodbye, this is also inherent in the dependency ordering since 
the goodbye task requires 'hi_happened' to run, and the only way to satisfy 
that dependency is to run the helloworld task before the goodbye task).


*  If you are wondering what the heck this is (or why it is useful to have 
these little task and flow classes) check out 
https://wiki.openstack.org/wiki/TaskFlow#Structure


Now the fun begins!


We need a worker to accept requests to run tasks on so lets create a small 
function that just does that.


def run_worker():

    worker_conf = dict(MQ_CONF)

    worker_conf.update({

        # These are the available tasks that this worker has access to execute.

        'tasks': [

            HelloWorldTask,

            GoodbyeWorldTask,

        ],

    })


    # Start this up and stop it on ctrl-c

    worker = remote_worker.Worker(**worker_conf)

    runner = threading.Thread(target=worker.run)

    runner.start()

    worker.wait()


    while True:

        try:

            time.sleep(1)

        except KeyboardInterrupt:

            LOG.info("Dying...")

            worker.stop()

            runner.join()

            break


And of course we need a function that will perform the orchestration of the 
remote (or local tasks), this function starts the whole execution flow by 
taking the workflow defined above and combining that workflow with an engine 
that will run the individual tasks (and transfer data between those tasks as 
needed).


* For those still wondering what an engine is (or what it offers) check out 
https://wiki.openstack.org/wiki/TaskFlow#Engines and 
https://wiki.openstack.org/wiki/TaskFlow/Patterns_and_Engines/Persistence#Big_Picture
 (which hopefully will make it easier to understand why the concept exists in 
the first place).


def run_engine():

    # Make some remote tasks happen

    f = lf.Flow("test")

    f.add(HelloWorldTask())

    f.add(GoodbyeWorldTask())


    # Create a in-memory storage area where intermediate results will be

    # saved (you can change this to a persistent backend if desired).

    backend = impl_memory.MemoryBackend({})

    _logbook, flowdetail = pu.temporary_flow_detail(backend)

    engine_conf = dict(MQ_CONF)

    engine_conf.update({

        # This identifies what workers are accessible via what queues, this

        # will be made better soon with reviews 
https://review.openstack.org/#/c/75094/

        # or similar.

        'workers_info': {

            'work': [

                HelloWorldTask().name,

                GoodbyeWorldTask().name,

            ],

        }

    })


    LOG.info("Running workflow %s", f)


    # Now run the engine.

    e = engine.WorkerBasedActionEngine(f, flowdetail, backend, engine_conf)

    with logging_listener.LoggingListener(e, level=logging.INFO):

        e.run()


    # See the results recieved.

    print("Final results: %s" % (e.storage.fetch_all()))


Now once we have this two methods created we can actually start the worker and 
the engine and watch the action happen. To do this without having to apply a 
little more boilerplate (imports and such) the code above can be found at 
http://paste.openstack.org/show/73071/.


To start a worker just do the following. Download the above paste to a file 
named 'test.py' (and then modify the MQ_SERVER global to point to your MQ host) 
and then run the following.


$ python test.py worker # starts a worker

$ python test.py # starts the main engine


You should start to see state transitions happening and the final engine result 
being produced by the engine coordinating calls with remote-workers.


* For the engine the output should be similar to 
http://paste.openstack.org/show/73072/, for the worker it should be similar to 
http://paste.openstack.org/show/73073/


Hopefully this new model can be useful in the future to heat, glance, ... and 
any others that would like to take advantage of said functionality (taskflow is 
a library on pypi that anyone can and is encouraged to use).


Feel free to ask questions, concerns, or any other comments welcome (here or in 
#openstack-state-management).


Thanks!


-Josh
_______________________________________________
OpenStack-dev mailing list
OpenStack-dev@lists.openstack.org
http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev

Reply via email to