Cool, my thoughts added ;)

From: Dmitri Zimine <[email protected]<mailto:[email protected]>>
Date: Tuesday, April 1, 2014 at 2:51 PM
To: Joshua Harlow <[email protected]<mailto:[email protected]>>
Cc: "OpenStack Development Mailing List (not for usage questions)" 
<[email protected]<mailto:[email protected]>>
Subject: Re: [openstack-dev] [Mistral] How Mistral handling long running 
delegate tasks

Even more responses inline :)
On Mar 31, 2014, at 8:44 PM, Joshua Harlow 
<[email protected]<mailto:[email protected]>> wrote:

More responses inline :)

From: Dmitri Zimine <[email protected]<mailto:[email protected]>>
Date: Monday, March 31, 2014 at 5:59 PM
To: Joshua Harlow <[email protected]<mailto:[email protected]>>
Cc: "OpenStack Development Mailing List (not for usage questions)" 
<[email protected]<mailto:[email protected]>>
Subject: Re: [openstack-dev] [Mistral] How Mistral handling long running 
delegate tasks

Inline...
On Mar 27, 2014, at 5:10 PM, Joshua Harlow 
<[email protected]<mailto:[email protected]>> wrote:

Thanks for the description!

The steps here seem very much like what a taskflow engine does (which is good).

To connect this to how I think could work in taskflow.

  1.  Someone creates tasks/flows describing the work to-be-done (converting a 
DSL -> taskflow tasks/flows/retry[1] objects…)
  2.  On execute(workflow) engine creates a new workflow execution, computes 
the first batch of tasks, creates executor for those tasks (remote, local…) and 
executes those tasks.
  3.  Waits for response back from 
futures<http://docs.python.org/dev/library/concurrent.futures.html> returned 
from executor.
  4.  Receives futures responses (or receives new response DELAY, for example), 
or exceptions…
  5.  Continues sending out batches of tasks that can be still be executing 
(aka tasks that don't have dependency on output of delayed tasks).
  6.  If any delayed tasks after repeating #2-5 as many times as it can, the 
engine will shut itself down (see http://tinyurl.com/l3x3rrb).

Why would engine treat long running tasks differently? The model Mistral tried 
out is the engine sends the batch of tasks and goes asleep; the 
'worker/executor' is calling engine back when the task(s) complete. Can it be 
applied

Not all ways of using taskflow I think want to go 'asleep', some active 
conductors I would think actually stay 'awake', that’s why I was thinking this 
would be a comprimise, the usage of 'DELAY' (via exception or return) would 
allow a task to say it is going to be finished sometime in the future (btw 
started this @ https://review.openstack.org/#/c/84277/ --- WIP, likely won't go 
into taskflow 0.2 but seems like a reasonable start on something like this 
proposal). To me offering just the going asleep way means that all users of 
taskflow need to have some type of entrypoint (rest, mq, other…) that restarts 
the engine on result finishing. I'd rather not force that model onto all users 
(since that seems inappropriate).
I understand and agree with not enforcing this model on all users. On a flip 
side, TaskFlow enforces the current model on Mistral :) It calls for supporting 
more models (to your earlier point of 3rd mode of operations). The BP proposal 
is a step in the direction but still a compromise. How can we support both 
'active' and 'passive' models, sharing the same 'library' - of all things 
responsible for control flow and data flow?

To me this is what engines 'types' are for, to support different execution 
models while using as many of the same components as possible (making usage as 
seamless as can be).


Here is a strawman (DISCLAIMER: this is sharing ideas and brainstorming: I 
don't know enough of TaksFlow to suggest):
1) break the run loop: add engine.run_async() method which instead of looping 
the tasks till the flow completes (http://tinyurl.com/nou2em9), only schedules 
the first batch and returns;
https://github.com/openstack/taskflow/blob/master/taskflow/engines/action_engine/graph_action.py#L61-L68

Seems like a possiblity, although I'd like to make this still a new engine type 
that does this (or maybe the existing engine types wrap this change to retain 
the usage that exists). This just seems like an extension of DELAY thought, 
where at each iteration the engine would return DELAYed, then the caller (the 
one executing) would become aware that this has happened and decide how to 
handle this accordingly. Exposing 'run_async' doesn't still feel right, maybe 
its just wording, but it seems like an 'async/lazy' engine just still has a 
run() method that returns DELAYed more often than an engine that loops and 
never returns DELAYed (or only returns DELAYed if a task causes this to 
happen). To me they are the same run() method, just different implementations 
of run(). Exposing the semantics of running by naming run_async exposes the 
underlying execution abstraction, which imho isn't the right way to make this 
happen.


2) introduce engine.task_complete(…) method that marks the current task 
complete, computes the next tasks ready for execution, schedules them and 
returns; loosely, doing this part: 
https://github.com/openstack/taskflow/blob/master/taskflow/engines/action_engine/graph_action.py#L75-L85

Possibly, although to me this is still exposing the internal of engines to 
users who shouldn't care (or only care if they are specifying an engine type 
that gives them access to these details). Allowing public access to these API's 
worries me in that they are now the API (which goes back to having an engine 
type that exposes these, if that’s desired, and if we are willing to accept the 
consequences of exposing them).


3) make executor signal engine back by calling engine.task_complete(…): 
executor.execute_task() submits the task to the worker queue and calls 
engine.task_complete(status, results, etc.) instead of waiting for task 
completion and returning the results.

And looks like it all can be done as a third lazy_engine, without messing the 
current two engines?

Possibly, I'm still of the thought that not all engine types should expose this 
information (due to above public API problem, once its public its public for a 
long time).






  1.  On delay task finishing some API/webhook/other (the mechanism imho 
shouldn't be tied to webhooks, at least not in taskflow, but should be left up 
to the user of taskflow to decide how to accomplish this) will be/must be 
responsible for resuming the engine and setting the result for the previous 
delayed task.

Oh no, webhook is the way to expose it to 3rd party system. From the library 
standpoint it's just an API call.

One can do it even now by getting the appropriate Flow_details, instantiating 
and engine (flow, flow_details) and running it to continue from where it left 
out. Is it how you mean it? But I keep on dreaming of a passive version of 
TaskFlow engine which treats all tasks the same and exposes one extra method - 
handle_tasks.

I was thinking that, although I'm unsure on the one extra method idea, can u 
explain more :) What would handle_tasks do? Restart/resume the engine 
(basically the work u stated 'getting the appropriate Flow_details, 
instantiating and engine (flow, flow_details) and running it to continue')? 
Seems like a tiny helper function if its really wanted, but maybe I'm 
misunderstanding.  It might be connected into a recent taskflow BP @ 
https://blueprints.launchpad.net/taskflow/+spec/generic-flow-conductor?

handle_task ops, I meant, engine.task_complete() - engine's call back to 
receive the task results. See above.
Again, I don't know enough to suggest TaskFlow solutions. Just brainstorming.
@Kirill, can you also give his view here?

  1.  Repeat 2 -> 7 until all tasks have executed/failed.
  2.  Profit!

This seems like it could be accomplished, although there are race conditions in 
the #6 (what if multiple delayed requests are received at the same time)? What 
locking is done to ensure that this doesn't cause conflicts?
Engine must handle concurrent calls of mutation methods - start, stop, 
handle_action. How -  differs depending on engine running in multiple threads 
or in event loop on queues of calls.

Agreed, to me this requires some level of locking, likely something that the 
tooz<https://review.openstack.org/#/c/71167/> library can provide (or in 
concept is similar to what the 
jobboard<https://wiki.openstack.org/wiki/TaskFlow/Paradigm_shifts#Workflow_ownership_transfer>
 concept provides, single 'atomic' engine access to a given workflow, ensuring 
this with a distributing locking scheme, such as zookeeper).


Does the POC solve that part (no simultaneous step #5 from below)?
Yes although we may want to revisit the current solution.

Is it using some form of database locking? :(


There was a mention of a watch-dog (ideally to ensure that delayed tasks can't 
just sit around forever), was that implemented?
If _delayed_ tasks and 'normal' tasks are treat alike, this is just a matter of 
timeout as a generic property on a task. So Mistral didn't have to have it. For 
the proposal above, a separate treatment is necessary for _delayed_ tasks.

Who responds to the timeout though? Isn't that process the watchdog then? 
Likely the triggering of a timeout causes something to react (in both cases).
The workflow engine IS this watch-dog (and in Mistral, engine is a single 
manager for all flow executions, in the prototype we call it engine_manager and 
I hate this name :)) Engine live in process or as a separate process. And it is 
passive - executed in a thread of a callee. E.g., in process is either called 
on messaging event handler thread, or by web method thread.

Right, which in mistral is a web-server right (aka, wherever mistral is setup) 
since the tasks finish by calling a rest-endpoint (or something sitting on MQ?)?




[1] https://wiki.openstack.org/wiki/TaskFlow#Retries (new feature!)
This is nice. I would call it a 'repeater': running a sub flow several times 
with various data for various reasons is reacher then 'retry'.
What about the 'retry policy' on individual task?

We went back and forth on names a few times (controller, retry, and a few 
others I think), haha. Retry is what currently stuck :-P

Repeater does sound nice also though, damn naming of 'things' ;)

For retries on individual tasks since taskflow supports flow nesting its not 
that hard to do this (although it might not be the most elegant).

>>> flow = LinearFlow("blah blah")
>>> flow.add(LinearFlow("subblahblah", retry=XYZ).add(OneTask()) <<< where the 
>>> one task is literally one task, haha.

A running example @ http://tinyurl.com/kv8q2t8

Room for improvement obviously (isn't there always), but that would/should work.

That may be good enough: when DSL is translated to flow, and the task demands 
repetition with timeout, it's ok to do this trick under the hood when compiling 
a flow.
flow.add(LinearFlow("subblahblah", 
retry=XYZ).add(OneTask().add(Timeout(timeout))

Yup, in a way most languages compilers do all these types of tricks under the 
hood (in much much more complicated manners); as long as we retain 'user 
intention' (aka don't mess how the code executes) we should be able to do any 
tricks we want (in fact most compliers do many many tricks). To me the same 
kind of tricks start to become possible after we get the basics right (can't do 
optimizations, aka -O2, if u don't have basics in the first place).



-Josh



From: Dmitri Zimine <[email protected]<mailto:[email protected]>>
Reply-To: "OpenStack Development Mailing List (not for usage questions)" 
<[email protected]<mailto:[email protected]>>
Date: Thursday, March 27, 2014 at 4:43 PM
To: "OpenStack Development Mailing List (not for usage questions)" 
<[email protected]<mailto:[email protected]>>
Subject: [openstack-dev] [Mistral] How Mistral handling long running delegate 
tasks

Following up on http://tinyurl.com/l8gtmsw and http://tinyurl.com/n3v9lt8: this 
explains how Mistral handles long running delegate tasks. Note that a 'passive' 
workflow engine can handle both normal tasks and delegates the same way. I'll 
also put that on ActionDesign wiki, after discussion.

Diagram:
https://docs.google.com/a/stackstorm.com/drawings/d/147_EpdatpN_sOLQ0LS07SWhaC3N85c95TkKMAeQ_a4c/edit?usp=sharing

1. On start(workflow), engine creates a new workflow execution, computes the 
first batch of tasks, sends them to ActionRunner [1].
2. ActionRunner creates an action and calls action.run(input)
3. Action does the work (compute (10!)), produce the results,  and return the 
results to executor. If it returns, status=SUCCESS. If it fails it throws 
exception, status=ERROR.
4. ActionRunner notifies Engine that the task is complete task_done(execution, 
task, status, results)[2]
5. Engine computes the next task(s) ready to trigger, according to control flow 
and data flow, and sends them to ActionRunner.
6. Like step 2: ActionRunner calls the action's run(input)
7. A delegate action doesn't produce results: it calls out the 3rd party 
system, which is expected to make a callback to a workflow service with the 
results. It returns to ActionRunner without results, "immediately".
8. ActionRunner marks status=RUNNING [?]
9. 3rd party system takes 'long time' == longer then any system component can 
be assumed to stay alive.
10. 3rd party component calls Mistral WebHook which resolves to 
engine.task_complete(workbook, id, status, results)

Comments:
* One Engine handles multiple executions of multiple workflows. It exposes two 
main operations: start(workflow) and task_complete(execution, task, status, 
results), and responsible for defining the next batch of tasks based on control 
flow and data flow. Engine is passive - it runs in a hosts' thread. Engine and 
ActionRunner communicate via task queues asynchronously, for details, see  
https://wiki.openstack.org/wiki/Mistral/POC

* Engine doesn't distinct sync and async actions, it doesn't deal with Actions 
at all. It only reacts to task completions, handling the results, updating the 
state, and queuing next set of tasks.

* Only Action can know and define if it is a delegate or not. Some protocol 
required to let ActionRunner know that the action is not returning the results 
immediately. A convention of returning None may be sufficient.

* Mistral exposes  engine.task_done in the REST API so 3rd party systems can 
call a web hook.

DZ.

[1]  I use ActionRunner instead of Executor (current name) to avoid confusion: 
it is Engine which is responsible for executions, and ActionRunner only runs 
actions. We should rename it in the code.

[2] I use task_done for briefly and out of pure spite, in the code it is 
conveny_task_results.


_______________________________________________
OpenStack-dev mailing list
[email protected]
http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev

Reply via email to