Thanks Tom,
But this way there is no dependency between the steps right ? here you are
just verifying that they are both completed. However, I do want step2 to be
dependent on step1 completion successfully .
So do i have to go on option 1 ? meaning add step_checker after each step
to verify completion
option 1:  start_pipeline >> create_emr_cluster >> step1 >> step1_watcher
>> step2 >> step2_warcher >> terminate

option 2:  start_pipeline >> create_emr_cluster >> step1 >> step2 >>
step2_warcher >> terminate

On Thu, Jun 3, 2021 at 5:20 AM Tom Korrison <tom.korri...@concirrus.com>
wrote:

> Hi,
>
>
>
> I only have one add_step task but a step_sensor for each step added.
>
>
>
> e.g.
>
>
>
> start_daily_pipeline = DummyOperator(
>     task_id="start_daily_pipeline",
>     dag=dag
> )
>
> cluster_creator = EmrCreateJobFlowOperator(
>     task_id="create_job_flow",
>     aws_conn_id="aws_role_default",
>     emr_conn_id="emr_default",
>     job_flow_overrides=JOB_FLOW_OVERRIDES,
>     dag=dag,
> )
>
> step_adder = EmrAddStepsOperator(
>     task_id="add_steps",
>     job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow',
> key='return_value') }}",
>     aws_conn_id="aws_role_default",
>     steps=SPARK_STEPS,
>     dag=dag,
> )
>
> step1_checker = EmrStepSensor(
>     task_id="watch_step_1",
>     job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
> key='return_value') }}",
>     step_id="{{ task_instance.xcom_pull(task_ids='add_steps',
> key='return_value')[0] }}",
>     aws_conn_id="aws_role_default",
>     dag=dag,
> )
>
> step2_checker = EmrStepSensor(
>     task_id="watch_step_2",
>     job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
> key='return_value') }}",
>     step_id="{{ task_instance.xcom_pull(task_ids='add_steps',
> key='return_value')[1] }}",
>     aws_conn_id="aws_role_default",
>     dag=dag,
> )
>
> job_flow_checker = EmrJobFlowSensor(
>     task_id="watch_job_flow",
>     job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
> key='return_value') }}",
>     step_id="{{ task_instance.xcom_pull('add_steps',
> key='return_value')[0] }}",
>     aws_conn_id="aws_role_default",
>     dag=dag,
> )
>
> cluster_remover = EmrTerminateJobFlowOperator(
>     task_id="remove_cluster",
>     job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow',
> key='return_value') }}",
>     aws_conn_id="aws_role_default",
>     dag=dag,
> )
>
>
> start_daily_pipeline >> cluster_creator >> step_adder
> step_adder >> [step1_checker, step2_checker] >> job_flow_checker >>
> cluster_remover
>
>
>
>
>
> *From:* Avi Levi <a...@theneura.com>
> *Sent:* 03 June 2021 08:21
> *To:* users@airflow.apache.org
> *Subject:* Create dependencies between emr steps
>
>
>
> Hi,
>
> How can I create dependencies between emr steps ? Do I need to create a
> step watcher between each one like below (option 1) or I don't need the
> step_watcher and they can be dependent directly (option 2) ? meaning
> something like this:
>
>
>
> step1 = EmrAddStepsOperator(
> task_id="step1",
> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster',
> key='return_value') }}",
> aws_conn_id="aws_default",
> steps=STEP1,
> dag=dag,
> )
>
>
>
> step2 = EmrAddStepsOperator(
> task_id="step2",
> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster',
> key='return_value') }}",
> aws_conn_id="aws_default",
> steps=STEP2,
> dag=dag,
> )
>
>
>
> step1_watcher = EmrStepSensor(
> task_id="step_1watcher",
> job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster',
> key='return_value') }}",
> step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0]
> }}",
> aws_conn_id="aws_default",
> dag=dag,
> )
>
>
>
> step2_watcher = EmrStepSensor(
> task_id="step_2watcher",
> job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster',
> key='return_value') }}",
> step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0]
> }}",
> aws_conn_id="aws_default",
> dag=dag,
> )
>
>
>
> option 1:  start_pipeline >> create_emr_cluster >> step1 >> step1_watcher
> >> step2 >> step2_warcher >> terminate
>
>
>
> option 2:  start_pipeline >> create_emr_cluster >> step1 >> step2 >>
> step2_warcher >> terminate
>
>
>

Reply via email to