This is what I mean by dynamic is this in your code:

    if table["branch_flag"] == BranchFlags.yes:

        task_1 >> task_2

This will be run at DAG creation time not a DAG run time, and therefore your 
“branch” will not work as you expect.

To get conditional branches you have to use the Branch Operators (either the 
BranchPythonOperator or sub-class the BaseBranchOperator):
https://airflow.apache.org/docs/stable/concepts.html?highlight=branch#branching
https://www.astronomer.io/guides/airflow-branch-operator/


Damian

From: Reed Villanueva <[email protected]>
Sent: Tuesday, January 28, 2020 14:54
To: [email protected]
Subject: Re: Can airflow dags have branches that both run, but do not converge?

I am not trying to dynamically branch while the dag is running if that's what 
you're asking.
The dag is intended to be static.
In the code given, the list_of_tables variable is a list of dicts of configs 
for different datasets, some of which use tasks that could be done concurrently 
(eg. write to local drive and write to remote DB) hence the branching.

On Tue, Jan 28, 2020 at 6:51 AM Shaw, Damian P. 
<[email protected]<mailto:[email protected]>> wrote:
Hi Reed,

Your DAG creation shouldn’t be dynamically changing frequently. The DAG itself 
in Airflow is fairly static, if in your DAG definition file you dynamically 
change your DAG based on conditions Airflow is simply going to think the tasks 
you are no longer defining don’t exist anymore. That’s what Airflow is  telling 
you, that you are no longer defining that task so it can’t find it anymore.



Damian

From: Reed Villanueva <[email protected]<mailto:[email protected]>>
Sent: Monday, January 27, 2020 17:50
To: [email protected]<mailto:[email protected]>
Subject: Can airflow dags have branches that both run, but do not converge?

...because these seems to not be that case.


I have an airflow graph with a conditional branch defined like

class BranchFlags(Enum):

    yes = "yes"

    no = "no"

...

for table in list_of_tables # type list(dict)

    task_1 = BashOperator(

        task_id='task_1_%s' % table["conf1"],

        bash_command='bash script1.sh %s' % table["conf1"],

        dag=dag)



    if table["branch_flag"] == BranchFlags.yes:

        consolidate = BashOperator(

            task_id='task_3_%s' % table["conf2"],

            bash_command='python %s/consolidate_parquet.py %s' % table["conf2"],

            dag=dag)



    task_3 = BashOperator(

        task_id='task_3_%s' % table["conf3"],

        bash_command='bash script3.sh %s' % table["conf3"],

        dag=dag)



    task_1 >> task_3

    if table["branch_flag"] == BranchFlags.yes:

        task_1 >> task_2

Even though the longer parts of the graph run fine, the lone branch is not 
being run for the one sequence / pipeline that was supposed to branch. When 
viewing the logs for the task, I see

*** Task instance did not exist in the DB

This is weird to me, since ostensibly the scheduler DB sees the task since it 
does appear in the web UI graph. Not sure what is going on here and adding 
other changes to the dag .py file do show up in the graph and are executed by 
the scheduler when running the graph. And attempting to view the tasks Task 
Instance Details throws error

Task [dagname.task_3_qwerty] doesn't seem to exist at the moment

Running airflow resetdb (as I've seen in other posts) does nothing for the 
problem.

Note that the intention is that the short branch runs concurrently with the 
longer branch (not as an either or choice).

Anyone know why this would be happening or have some debugging tips?

This electronic message is intended only for the named
recipient, and may contain information that is confidential or
privileged. If you are not the intended recipient, you are
hereby notified that any disclosure, copying, distribution or
use of the contents of this message is strictly prohibited. If
you have received this message in error or are not the named
recipient, please notify us immediately by contacting the
sender at the electronic mail address noted above, and delete
and destroy all copies of this message. Thank you.


==============================================================================
Please access the attached hyperlink for an important electronic communications 
disclaimer:
http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
==============================================================================

This electronic message is intended only for the named
recipient, and may contain information that is confidential or
privileged. If you are not the intended recipient, you are
hereby notified that any disclosure, copying, distribution or
use of the contents of this message is strictly prohibited. If
you have received this message in error or are not the named
recipient, please notify us immediately by contacting the
sender at the electronic mail address noted above, and delete
and destroy all copies of this message. Thank you.



=============================================================================== 
Please access the attached hyperlink for an important electronic communications 
disclaimer: 
http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html 
=============================================================================== 

Reply via email to