[ https://issues.apache.org/jira/browse/AIRFLOW-5648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Qian Yu reassigned AIRFLOW-5648: -------------------------------- Assignee: Qian Yu > Add ClearTaskOperator to allow clearing/re-running tasks from within a DAG > -------------------------------------------------------------------------- > > Key: AIRFLOW-5648 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5648 > Project: Apache Airflow > Issue Type: New Feature > Components: operators > Affects Versions: 1.10.5 > Reporter: Qian Yu > Assignee: Qian Yu > Priority: Major > Labels: ClearTaskOperator, airflow, clear, duplicate, operator, > re-run, rerun, task > > There are use cases where some external conditions have changed and a section > of the DAG needs to be re-run (after they have already finished previously). > Here's such an example I recently encountered: > We have a DAG that runs the following in the morning for execution_date T. > The preliminary result of task J is needed at 9am in the morning. K, L and M > needs to wait for Sensor to pass so they are not done till much later in the > evening: > {code:java} > A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish > ^ ^ > | | > B >> D >> F>>>>> Sensor > {code} > Later on in the afternoon at 3pm, some external condition changes (indicated > by Sensor). At that point, we need to re-run task A and all its downstream > tasks that have already run (i.e. A, C, E, G, H, I, J) to reflect possible > changes. Other finished tasks such as B, D, F do not need to be re-run. The > new results of J is needed in the evening by downstream tasks K, L, M that > have been waiting. > One possible approach is to make the DAG look like this by duplicating the > section that needs to be re-run. In the following image, A1 runs the same > command as A, C1 runs the same command as C, etc. This mostly works, but it > causes the DAG to look unnecessarily large. It also causes code duplication > because the tasks A1, C1, E1, G1, H1, I1, J1 are all identical to the > original tasks. In this simplified example, the duplication does not look too > bad, but in the real examples I faced, task A has many downstream tasks with > complex dependencies. Copying all of them is more difficult. Because of these > duplication, the next time someone updates the DAG and inserts a new task in > the middle of E and G, it'll be hard to remember to add it in between E1 and > G1 as well. > {code:java} > A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish > ^ ^ > | |__________ > B >> D >> F>>>> | > | > Sensor >> A1 >> C1 >> E1 >> G1 >> H1 >> I1 >> J1 > {code} > Instead of duplicating the tasks, I'm proposing adding a ClearTaskOperator. > This operator takes an external_task_id as its parameter. When > ClearTaskOperator runs, it clears the state of the given external_task_id and > all its downstream tasks. This will cause them to re-run. So the problem I'm > facing can be tackled without duplicating all those tasks. With > ClearTaskOperator, the DAG can look like this. > {code:java} > A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish > ^ ^ > | | > B >> D >> F>>>> | > | > Sensor >> Clear_Task_A >>>>>>>>>>>>> > {code} > In the above DAG, Clear_Task_A is a ClearTaskOperator defined like this. When > Clear_Task_A executes, it clears task A and all its downstream tasks (so in > this case it causes A, C, E, G, H, I, J to be cleared and re-run). > {code:python} > Clear_Task_A = ClearTaskOperator(task_id="Clear_Task_A", > external_task_id="A") {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)