[ 
https://issues.apache.org/jira/browse/AIRFLOW-5648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qian Yu reassigned AIRFLOW-5648:
--------------------------------

    Assignee: Qian Yu

> Add ClearTaskOperator to allow clearing/re-running tasks from within a DAG
> --------------------------------------------------------------------------
>
>                 Key: AIRFLOW-5648
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-5648
>             Project: Apache Airflow
>          Issue Type: New Feature
>          Components: operators
>    Affects Versions: 1.10.5
>            Reporter: Qian Yu
>            Assignee: Qian Yu
>            Priority: Major
>              Labels: ClearTaskOperator, airflow, clear, duplicate, operator, 
> re-run, rerun, task
>
> There are use cases where some external conditions have changed and a section 
> of the DAG needs to be re-run (after they have already finished previously). 
> Here's such an example I recently encountered:
> We have a DAG that runs the following in the morning for execution_date T. 
> The preliminary result of task J is needed at 9am in the morning. K, L and M 
> needs to wait for Sensor to pass so they are not done till much later in the 
> evening:
> {code:java}
> A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish
>                ^                   ^
>                |                   |
> B >> D >> F>>>>>               Sensor
> {code}
> Later on in the afternoon at 3pm, some external condition changes (indicated 
> by Sensor). At that point, we need to re-run task A and all its downstream 
> tasks that have already run (i.e. A, C, E, G, H, I, J) to reflect possible 
> changes. Other finished tasks such as B, D, F do not need to be re-run. The 
> new results of J is needed in the evening by downstream tasks K, L, M that 
> have been waiting.
> One possible approach is to make the DAG look like this by duplicating the 
> section that needs to be re-run. In the following image, A1 runs the same 
> command as A, C1 runs the same command as C, etc. This mostly works, but it 
> causes the DAG to look unnecessarily large. It also causes code duplication 
> because the tasks A1, C1, E1, G1, H1, I1, J1 are all identical to the 
> original tasks. In this simplified example, the duplication does not look too 
> bad, but in the real examples I faced, task A has many downstream tasks with 
> complex dependencies. Copying all of them is more difficult. Because of these 
> duplication, the next time someone updates the DAG and inserts a new task in 
> the middle of E and G, it'll be hard to remember to add it in between E1 and 
> G1 as well.
> {code:java}
> A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish
>                ^                   ^          
>                |                   |__________         
> B >> D >> F>>>>                               |
>                                               |
> Sensor >> A1 >> C1 >> E1 >> G1 >> H1 >> I1 >> J1
> {code}
> Instead of duplicating the tasks, I'm proposing adding a ClearTaskOperator. 
> This operator takes an external_task_id as its parameter. When 
> ClearTaskOperator runs, it clears the state of the given external_task_id and 
> all its downstream tasks. This will cause them to re-run. So the problem I'm 
> facing can be tackled without duplicating all those tasks. With 
> ClearTaskOperator, the DAG can look like this. 
> {code:java}
> A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish
>                ^                   ^          
>                |                   |         
> B >> D >> F>>>>                    |
>                                    |
> Sensor >> Clear_Task_A >>>>>>>>>>>>>
> {code}
> In the above DAG, Clear_Task_A is a ClearTaskOperator defined like this. When 
> Clear_Task_A executes, it clears task A and all its downstream tasks (so in 
> this case it causes A, C, E, G, H, I, J to be cleared and re-run).
> {code:python}
> Clear_Task_A = ClearTaskOperator(task_id="Clear_Task_A", 
> external_task_id="A") {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to