Even more responses inline :) On Mar 31, 2014, at 8:44 PM, Joshua Harlow <[email protected]> wrote:
> More responses inline :) > > From: Dmitri Zimine <[email protected]> > Date: Monday, March 31, 2014 at 5:59 PM > To: Joshua Harlow <[email protected]> > Cc: "OpenStack Development Mailing List (not for usage questions)" > <[email protected]> > Subject: Re: [openstack-dev] [Mistral] How Mistral handling long running > delegate tasks > >> Inline... >> On Mar 27, 2014, at 5:10 PM, Joshua Harlow <[email protected]> wrote: >> >>> Thanks for the description! >>> >>> The steps here seem very much like what a taskflow engine does (which is >>> good). >>> >>> To connect this to how I think could work in taskflow. >>> Someone creates tasks/flows describing the work to-be-done (converting a >>> DSL -> taskflow tasks/flows/retry[1] objects…) >>> On execute(workflow) engine creates a new workflow execution, computes the >>> first batch of tasks, creates executor for those tasks (remote, local…) and >>> executes those tasks. >>> Waits for response back from futures returned from executor. >>> Receives futures responses (or receives new response DELAY, for example), >>> or exceptions… >>> Continues sending out batches of tasks that can be still be executing (aka >>> tasks that don't have dependency on output of delayed tasks). >>> If any delayed tasks after repeating #2-5 as many times as it can, the >>> engine will shut itself down (see http://tinyurl.com/l3x3rrb). >> Why would engine treat long running tasks differently? The model Mistral >> tried out is the engine sends the batch of tasks and goes asleep; the >> 'worker/executor' is calling engine back when the task(s) complete. Can it >> be applied > > > Not all ways of using taskflow I think want to go 'asleep', some active > conductors I would think actually stay 'awake', that’s why I was thinking > this would be a comprimise, the usage of 'DELAY' (via exception or return) > would allow a task to say it is going to be finished sometime in the future > (btw started this @ https://review.openstack.org/#/c/84277/ --- WIP, likely > won't go into taskflow 0.2 but seems like a reasonable start on something > like this proposal). To me offering just the going asleep way means that all > users of taskflow need to have some type of entrypoint (rest, mq, other…) > that restarts the engine on result finishing. I'd rather not force that model > onto all users (since that seems inappropriate). I understand and agree with not enforcing this model on all users. On a flip side, TaskFlow enforces the current model on Mistral :) It calls for supporting more models (to your earlier point of 3rd mode of operations). The BP proposal is a step in the direction but still a compromise. How can we support both 'active' and 'passive' models, sharing the same 'library' - of all things responsible for control flow and data flow? Here is a strawman (DISCLAIMER: this is sharing ideas and brainstorming: I don't know enough of TaksFlow to suggest): 1) break the run loop: add engine.run_async() method which instead of looping the tasks till the flow completes (http://tinyurl.com/nou2em9), only schedules the first batch and returns; https://github.com/openstack/taskflow/blob/master/taskflow/engines/action_engine/graph_action.py#L61-L68 2) introduce engine.task_complete(…) method that marks the current task complete, computes the next tasks ready for execution, schedules them and returns; loosely, doing this part: https://github.com/openstack/taskflow/blob/master/taskflow/engines/action_engine/graph_action.py#L75-L85 3) make executor signal engine back by calling engine.task_complete(…): executor.execute_task() submits the task to the worker queue and calls engine.task_complete(status, results, etc.) instead of waiting for task completion and returning the results. And looks like it all can be done as a third lazy_engine, without messing the current two engines? > >> >>> On delay task finishing some API/webhook/other (the mechanism imho >>> shouldn't be tied to webhooks, at least not in taskflow, but should be left >>> up to the user of taskflow to decide how to accomplish this) will be/must >>> be responsible for resuming the engine and setting the result for the >>> previous delayed task. >> Oh no, webhook is the way to expose it to 3rd party system. From the library >> standpoint it's just an API call. >> >> One can do it even now by getting the appropriate Flow_details, >> instantiating and engine (flow, flow_details) and running it to continue >> from where it left out. Is it how you mean it? But I keep on dreaming of a >> passive version of TaskFlow engine which treats all tasks the same and >> exposes one extra method - handle_tasks. > > > I was thinking that, although I'm unsure on the one extra method idea, can u > explain more :) What would handle_tasks do? Restart/resume the engine > (basically the work u stated 'getting the appropriate Flow_details, > instantiating and engine (flow, flow_details) and running it to continue')? > Seems like a tiny helper function if its really wanted, but maybe I'm > misunderstanding. It might be connected into a recent taskflow BP @ > https://blueprints.launchpad.net/taskflow/+spec/generic-flow-conductor? handle_task ops, I meant, engine.task_complete() - engine's call back to receive the task results. See above. Again, I don't know enough to suggest TaskFlow solutions. Just brainstorming. @Kirill, can you also give his view here? >>> Repeat 2 -> 7 until all tasks have executed/failed. >>> Profit! >> >>> This seems like it could be accomplished, although there are race >>> conditions in the #6 (what if multiple delayed requests are received at the >>> same time)? What locking is done to ensure that this doesn't cause >>> conflicts? >> Engine must handle concurrent calls of mutation methods - start, stop, >> handle_action. How - differs depending on engine running in multiple >> threads or in event loop on queues of calls. > > > Agreed, to me this requires some level of locking, likely something that the > tooz library can provide (or in concept is similar to what the jobboard > concept provides, single 'atomic' engine access to a given workflow, ensuring > this with a distributing locking scheme, such as zookeeper). > >> >>> Does the POC solve that part (no simultaneous step #5 from below)? >> Yes although we may want to revisit the current solution. > > > Is it using some form of database locking? :( > >> >>> There was a mention of a watch-dog (ideally to ensure that delayed tasks >>> can't just sit around forever), was that implemented? >> If _delayed_ tasks and 'normal' tasks are treat alike, this is just a matter >> of timeout as a generic property on a task. So Mistral didn't have to have >> it. For the proposal above, a separate treatment is necessary for _delayed_ >> tasks. > > > Who responds to the timeout though? Isn't that process the watchdog then? > Likely the triggering of a timeout causes something to react (in both cases). The workflow engine IS this watch-dog (and in Mistral, engine is a single manager for all flow executions, in the prototype we call it engine_manager and I hate this name :)) Engine live in process or as a separate process. And it is passive - executed in a thread of a callee. E.g., in process is either called on messaging event handler thread, or by web method thread. >>> >>> [1] https://wiki.openstack.org/wiki/TaskFlow#Retries (new feature!) >> This is nice. I would call it a 'repeater': running a sub flow several times >> with various data for various reasons is reacher then 'retry'. >> What about the 'retry policy' on individual task? > > > We went back and forth on names a few times (controller, retry, and a few > others I think), haha. Retry is what currently stuck :-P > > Repeater does sound nice also though, damn naming of 'things' ;) > > For retries on individual tasks since taskflow supports flow nesting its not > that hard to do this (although it might not be the most elegant). > > >>> flow = LinearFlow("blah blah") > >>> flow.add(LinearFlow("subblahblah", retry=XYZ).add(OneTask()) <<< where > >>> the one task is literally one task, haha. > > A running example @ http://tinyurl.com/kv8q2t8 > > Room for improvement obviously (isn't there always), but that would/should > work. That may be good enough: when DSL is translated to flow, and the task demands repetition with timeout, it's ok to do this trick under the hood when compiling a flow. flow.add(LinearFlow("subblahblah", retry=XYZ).add(OneTask().add(Timeout(timeout)) > > -Josh > >> >>> >>> From: Dmitri Zimine <[email protected]> >>> Reply-To: "OpenStack Development Mailing List (not for usage questions)" >>> <[email protected]> >>> Date: Thursday, March 27, 2014 at 4:43 PM >>> To: "OpenStack Development Mailing List (not for usage questions)" >>> <[email protected]> >>> Subject: [openstack-dev] [Mistral] How Mistral handling long running >>> delegate tasks >>> >>>> Following up on http://tinyurl.com/l8gtmsw and http://tinyurl.com/n3v9lt8: >>>> this explains how Mistral handles long running delegate tasks. Note that a >>>> 'passive' workflow engine can handle both normal tasks and delegates the >>>> same way. I'll also put that on ActionDesign wiki, after discussion. >>>> >>>> Diagram: >>>> https://docs.google.com/a/stackstorm.com/drawings/d/147_EpdatpN_sOLQ0LS07SWhaC3N85c95TkKMAeQ_a4c/edit?usp=sharing >>>> >>>> 1. On start(workflow), engine creates a new workflow execution, computes >>>> the first batch of tasks, sends them to ActionRunner [1]. >>>> 2. ActionRunner creates an action and calls action.run(input) >>>> 3. Action does the work (compute (10!)), produce the results, and return >>>> the results to executor. If it returns, status=SUCCESS. If it fails it >>>> throws exception, status=ERROR. >>>> 4. ActionRunner notifies Engine that the task is complete >>>> task_done(execution, task, status, results)[2] >>>> 5. Engine computes the next task(s) ready to trigger, according to control >>>> flow and data flow, and sends them to ActionRunner. >>>> 6. Like step 2: ActionRunner calls the action's run(input) >>>> 7. A delegate action doesn't produce results: it calls out the 3rd party >>>> system, which is expected to make a callback to a workflow service with >>>> the results. It returns to ActionRunner without results, "immediately". >>>> 8. ActionRunner marks status=RUNNING [?] >>>> 9. 3rd party system takes 'long time' == longer then any system component >>>> can be assumed to stay alive. >>>> 10. 3rd party component calls Mistral WebHook which resolves to >>>> engine.task_complete(workbook, id, status, results) >>>> >>>> Comments: >>>> * One Engine handles multiple executions of multiple workflows. It exposes >>>> two main operations: start(workflow) and task_complete(execution, task, >>>> status, results), and responsible for defining the next batch of tasks >>>> based on control flow and data flow. Engine is passive - it runs in a >>>> hosts' thread. Engine and ActionRunner communicate via task queues >>>> asynchronously, for details, see >>>> https://wiki.openstack.org/wiki/Mistral/POC >>>> >>>> * Engine doesn't distinct sync and async actions, it doesn't deal with >>>> Actions at all. It only reacts to task completions, handling the results, >>>> updating the state, and queuing next set of tasks. >>>> >>>> * Only Action can know and define if it is a delegate or not. Some >>>> protocol required to let ActionRunner know that the action is not >>>> returning the results immediately. A convention of returning None may be >>>> sufficient. >>>> >>>> * Mistral exposes engine.task_done in the REST API so 3rd party systems >>>> can call a web hook. >>>> >>>> DZ. >>>> >>>> [1] I use ActionRunner instead of Executor (current name) to avoid >>>> confusion: it is Engine which is responsible for executions, and >>>> ActionRunner only runs actions. We should rename it in the code. >>>> >>>> [2] I use task_done for briefly and out of pure spite, in the code it is >>>> conveny_task_results. >>
_______________________________________________ OpenStack-dev mailing list [email protected] http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
