Thanks Daniel. I am pretty new with Airflow, your suggestions sound great but can you add a reference to some implementation examples ?
Best Avi On Thu, Jun 3, 2021 at 11:46 AM Daniel Standish <dpstand...@gmail.com> wrote: > So do i have to go on option 1 ? meaning add step_checker after each step >> to verify completion > > > One thing you can do to make this less onerous is subclass > EmrAddStepsOperator and add a property that produces the sensor it needs. > > So you would have something like > > task1 = EmrAddStepsOperator(...) > task1_sensor = task1.sensor_task > task1 >> task1_sensor > > Another option you have is you can create EmrAddStepsSyncronousOperator, > and do the waiting in the same task. > > > On Thu, Jun 3, 2021 at 7:46 AM Avi Levi <a...@theneura.com> wrote: > >> Thanks Tom, >> But this way there is no dependency between the steps right ? here you >> are just verifying that they are both completed. However, I do want step2 >> to be dependent on step1 completion successfully . >> So do i have to go on option 1 ? meaning add step_checker after each step >> to verify completion >> option 1: start_pipeline >> create_emr_cluster >> step1 >> >> step1_watcher >> step2 >> step2_warcher >> terminate >> >> option 2: start_pipeline >> create_emr_cluster >> step1 >> step2 >> >> step2_warcher >> terminate >> >> On Thu, Jun 3, 2021 at 5:20 AM Tom Korrison <tom.korri...@concirrus.com> >> wrote: >> >>> Hi, >>> >>> >>> >>> I only have one add_step task but a step_sensor for each step added. >>> >>> >>> >>> e.g. >>> >>> >>> >>> start_daily_pipeline = DummyOperator( >>> task_id="start_daily_pipeline", >>> dag=dag >>> ) >>> >>> cluster_creator = EmrCreateJobFlowOperator( >>> task_id="create_job_flow", >>> aws_conn_id="aws_role_default", >>> emr_conn_id="emr_default", >>> job_flow_overrides=JOB_FLOW_OVERRIDES, >>> dag=dag, >>> ) >>> >>> step_adder = EmrAddStepsOperator( >>> task_id="add_steps", >>> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', >>> key='return_value') }}", >>> aws_conn_id="aws_role_default", >>> steps=SPARK_STEPS, >>> dag=dag, >>> ) >>> >>> step1_checker = EmrStepSensor( >>> task_id="watch_step_1", >>> job_flow_id="{{ task_instance.xcom_pull('create_job_flow', >>> key='return_value') }}", >>> step_id="{{ task_instance.xcom_pull(task_ids='add_steps', >>> key='return_value')[0] }}", >>> aws_conn_id="aws_role_default", >>> dag=dag, >>> ) >>> >>> step2_checker = EmrStepSensor( >>> task_id="watch_step_2", >>> job_flow_id="{{ task_instance.xcom_pull('create_job_flow', >>> key='return_value') }}", >>> step_id="{{ task_instance.xcom_pull(task_ids='add_steps', >>> key='return_value')[1] }}", >>> aws_conn_id="aws_role_default", >>> dag=dag, >>> ) >>> >>> job_flow_checker = EmrJobFlowSensor( >>> task_id="watch_job_flow", >>> job_flow_id="{{ task_instance.xcom_pull('create_job_flow', >>> key='return_value') }}", >>> step_id="{{ task_instance.xcom_pull('add_steps', >>> key='return_value')[0] }}", >>> aws_conn_id="aws_role_default", >>> dag=dag, >>> ) >>> >>> cluster_remover = EmrTerminateJobFlowOperator( >>> task_id="remove_cluster", >>> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', >>> key='return_value') }}", >>> aws_conn_id="aws_role_default", >>> dag=dag, >>> ) >>> >>> >>> start_daily_pipeline >> cluster_creator >> step_adder >>> step_adder >> [step1_checker, step2_checker] >> job_flow_checker >> >>> cluster_remover >>> >>> >>> >>> >>> >>> *From:* Avi Levi <a...@theneura.com> >>> *Sent:* 03 June 2021 08:21 >>> *To:* users@airflow.apache.org >>> *Subject:* Create dependencies between emr steps >>> >>> >>> >>> Hi, >>> >>> How can I create dependencies between emr steps ? Do I need to create a >>> step watcher between each one like below (option 1) or I don't need the >>> step_watcher and they can be dependent directly (option 2) ? meaning >>> something like this: >>> >>> >>> >>> step1 = EmrAddStepsOperator( >>> task_id="step1", >>> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', >>> key='return_value') }}", >>> aws_conn_id="aws_default", >>> steps=STEP1, >>> dag=dag, >>> ) >>> >>> >>> >>> step2 = EmrAddStepsOperator( >>> task_id="step2", >>> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', >>> key='return_value') }}", >>> aws_conn_id="aws_default", >>> steps=STEP2, >>> dag=dag, >>> ) >>> >>> >>> >>> step1_watcher = EmrStepSensor( >>> task_id="step_1watcher", >>> job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', >>> key='return_value') }}", >>> step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] >>> }}", >>> aws_conn_id="aws_default", >>> dag=dag, >>> ) >>> >>> >>> >>> step2_watcher = EmrStepSensor( >>> task_id="step_2watcher", >>> job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', >>> key='return_value') }}", >>> step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] >>> }}", >>> aws_conn_id="aws_default", >>> dag=dag, >>> ) >>> >>> >>> >>> option 1: start_pipeline >> create_emr_cluster >> step1 >> >>> step1_watcher >> step2 >> step2_warcher >> terminate >>> >>> >>> >>> option 2: start_pipeline >> create_emr_cluster >> step1 >> step2 >> >>> step2_warcher >> terminate >>> >>> >>> >>