>
> So do i have to go on option 1 ? meaning add step_checker after each step
> to verify completion


One thing you can do to make this less onerous is subclass
EmrAddStepsOperator  and add a property that produces the sensor it needs.

So you would have something like

task1  = EmrAddStepsOperator(...)
task1_sensor = task1.sensor_task
task1 >> task1_sensor

Another option you have is you can create EmrAddStepsSyncronousOperator,
and do the waiting in the same task.


On Thu, Jun 3, 2021 at 7:46 AM Avi Levi <a...@theneura.com> wrote:

> Thanks Tom,
> But this way there is no dependency between the steps right ? here you are
> just verifying that they are both completed. However, I do want step2 to be
> dependent on step1 completion successfully .
> So do i have to go on option 1 ? meaning add step_checker after each step
> to verify completion
> option 1:  start_pipeline >> create_emr_cluster >> step1 >> step1_watcher
> >> step2 >> step2_warcher >> terminate
>
> option 2:  start_pipeline >> create_emr_cluster >> step1 >> step2 >>
> step2_warcher >> terminate
>
> On Thu, Jun 3, 2021 at 5:20 AM Tom Korrison <tom.korri...@concirrus.com>
> wrote:
>
>> Hi,
>>
>>
>>
>> I only have one add_step task but a step_sensor for each step added.
>>
>>
>>
>> e.g.
>>
>>
>>
>> start_daily_pipeline = DummyOperator(
>>     task_id="start_daily_pipeline",
>>     dag=dag
>> )
>>
>> cluster_creator = EmrCreateJobFlowOperator(
>>     task_id="create_job_flow",
>>     aws_conn_id="aws_role_default",
>>     emr_conn_id="emr_default",
>>     job_flow_overrides=JOB_FLOW_OVERRIDES,
>>     dag=dag,
>> )
>>
>> step_adder = EmrAddStepsOperator(
>>     task_id="add_steps",
>>     job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow',
>> key='return_value') }}",
>>     aws_conn_id="aws_role_default",
>>     steps=SPARK_STEPS,
>>     dag=dag,
>> )
>>
>> step1_checker = EmrStepSensor(
>>     task_id="watch_step_1",
>>     job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
>> key='return_value') }}",
>>     step_id="{{ task_instance.xcom_pull(task_ids='add_steps',
>> key='return_value')[0] }}",
>>     aws_conn_id="aws_role_default",
>>     dag=dag,
>> )
>>
>> step2_checker = EmrStepSensor(
>>     task_id="watch_step_2",
>>     job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
>> key='return_value') }}",
>>     step_id="{{ task_instance.xcom_pull(task_ids='add_steps',
>> key='return_value')[1] }}",
>>     aws_conn_id="aws_role_default",
>>     dag=dag,
>> )
>>
>> job_flow_checker = EmrJobFlowSensor(
>>     task_id="watch_job_flow",
>>     job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
>> key='return_value') }}",
>>     step_id="{{ task_instance.xcom_pull('add_steps',
>> key='return_value')[0] }}",
>>     aws_conn_id="aws_role_default",
>>     dag=dag,
>> )
>>
>> cluster_remover = EmrTerminateJobFlowOperator(
>>     task_id="remove_cluster",
>>     job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow',
>> key='return_value') }}",
>>     aws_conn_id="aws_role_default",
>>     dag=dag,
>> )
>>
>>
>> start_daily_pipeline >> cluster_creator >> step_adder
>> step_adder >> [step1_checker, step2_checker] >> job_flow_checker >>
>> cluster_remover
>>
>>
>>
>>
>>
>> *From:* Avi Levi <a...@theneura.com>
>> *Sent:* 03 June 2021 08:21
>> *To:* users@airflow.apache.org
>> *Subject:* Create dependencies between emr steps
>>
>>
>>
>> Hi,
>>
>> How can I create dependencies between emr steps ? Do I need to create a
>> step watcher between each one like below (option 1) or I don't need the
>> step_watcher and they can be dependent directly (option 2) ? meaning
>> something like this:
>>
>>
>>
>> step1 = EmrAddStepsOperator(
>> task_id="step1",
>> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster',
>> key='return_value') }}",
>> aws_conn_id="aws_default",
>> steps=STEP1,
>> dag=dag,
>> )
>>
>>
>>
>> step2 = EmrAddStepsOperator(
>> task_id="step2",
>> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster',
>> key='return_value') }}",
>> aws_conn_id="aws_default",
>> steps=STEP2,
>> dag=dag,
>> )
>>
>>
>>
>> step1_watcher = EmrStepSensor(
>> task_id="step_1watcher",
>> job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster',
>> key='return_value') }}",
>> step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0]
>> }}",
>> aws_conn_id="aws_default",
>> dag=dag,
>> )
>>
>>
>>
>> step2_watcher = EmrStepSensor(
>> task_id="step_2watcher",
>> job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster',
>> key='return_value') }}",
>> step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0]
>> }}",
>> aws_conn_id="aws_default",
>> dag=dag,
>> )
>>
>>
>>
>> option 1:  start_pipeline >> create_emr_cluster >> step1 >>
>> step1_watcher >> step2 >> step2_warcher >> terminate
>>
>>
>>
>> option 2:  start_pipeline >> create_emr_cluster >> step1 >> step2 >>
>> step2_warcher >> terminate
>>
>>
>>
>

Reply via email to