Hi All,

 

Could any one please help me with External Task Sensor. I am facing issue with External Task sensor, and attached are my Dag’s.

 

I have kept  Mode = Reschedule which is making the job to reschedule and not triggering the  Task 2 in my Dag2.

 

Thanks

Sai Prasad Golla

import datetime
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020,1,18,1,15),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    # 'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}



dag = DAG(
    'Hello1', catchup=False, 
schedule_interval=timedelta(minutes=5),start_date=datetime(2020, 1, 
18,1,15),default_args=default_args )


t1 = BashOperator(
    task_id='T1',
    bash_command='/home/radmin/AIStratBuilder/script/Test_Script.sh ',
    dag=dag
)

import datetime
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import ExternalTaskSensor
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.models import TaskInstance
from airflow.utils.db import provide_session
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
from airflow.operators.dummy_operator import DummyOperator


dag = DAG('Hello2', description='DAG with sensor', 
schedule_interval=timedelta(minutes=5),start_date=datetime(2020, 1, 18,1,15)
          )

sensor=ExternalTaskSensor(
    task_id='Wait_Task',
    external_dag_id='Hello1',
    external_task_id='T1',
    execution_delta=timedelta(minutes=5),
    mode = 'reschedule',
    allowed_states='None',
    dag=dag)


task = BashOperator(
       task_id='T2',
       bash_command='echo Hi ',
       dag=dag
)

task.set_upstream(sensor)

Reply via email to