I've gotten this working now, thanks very much!

My problem was my DAG wasn't using the TaskFlow API. My understanding is that the context object is provided as a freebie via the @task decorator?

Would it be accurate to say that the Taskflow API is a virtual requirement for dynamic task mapping? I say "virtual", because any sort of workflow that requires multi-tenancy/concurrency is going to need to some sort of unique ID/dag run to tap into, and without reinventing the wheel this is best provided through the Airflow context?

Just clarifying this for my documentation PR...



Daniel Standish wrote on 2022-06-15 6:09 PM:
it doesn't look like you've made schema_dump_input_cmds a task... maybe provide the full dag

Accessing context variables in taskflow api is documented here: https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html#accessing-context-variables-in-decorated-tasks

On Wed, Jun 15, 2022 at 5:07 PM Joe Auty <joea...@gmail.com <mailto:joea...@gmail.com>> wrote:

    It looks like the context is not passed through:

    defschema_dump_input_cmds(**context):
    print(context)

    schema_dump= KubernetesPodOperator.partial(
    task_id="schema-dump-input",
    namespace=NAMESPACE,
    image=REGISTRY_URL+ "/postgres-client:12-"+ AGENT_VERSION,
    .. snipped
    max_active_tis_per_dag=1,
    dag=dag
    ).expand(
    cmds=schema_dump_input_cmds(),
    env_vars=schema_dump_input_env_vars()
    )


    I'm seeing an empty object in my logs. Any idea why, or any
    suggestions here?




    Daniel Standish wrote on 2022-06-15 12:57 PM:
    Turns out this issue is actually documented here:
    
https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html#how-do-templated-fields-and-mapped-arguments-interact

    On Wed, Jun 15, 2022 at 12:33 PM Daniel Standish
    <daniel.stand...@astronomer.io
    <mailto:daniel.stand...@astronomer.io>> wrote:

        Yeah it does seem that templating does not work with expanded
        params at the moment.  No promises but I would bet this will
        change at some point.  Seems reasonable and I can't think of
        a technical problem.

        Tal's approach looks good.  I came up with basically the same
        thing when testing this out:

        @dag.task def gen_vars(ds=None):
             return [{"MY_VAR": ds}for x in range(3)]

        op2 = BashOperator.partial(task_id='task_mapped', bash_command="echo 
$MY_VAR",).expand(
             env=gen_vars(), )

        One thing I noticed in your example was, it appears, maybe
        you are mapping from two upstream tasks, for two mapped
        arguments.  You may have noticed, this will result in a
        cartesian product (maybe this is desirable for you?).  This
        is an area we are actively working on, so that you can
        provide multiple kwargs to map from a single task.



        On Wed, Jun 15, 2022 at 3:36 AM Tal Nagar
        <tal.na...@evogene.com <mailto:tal.na...@evogene.com>> wrote:

            Hi Joe

            Not sure if it's the recommended way to do it, however
            what worked for me is rendering the templates in a python
            task and not directly in the KubernetesPodOperator.

            For example:

                @task()

                def create_cmds(**context):

                    run_params = context["params"]

                    size = int(run_params['size'])

                    arr = []

                    for s in range(int(size)):

            arr.append([f'echo hi {s}'])

                    return arr

                say_hi = KubernetesPodOperator.partial(

            image="alpine",

            cmds=["/bin/sh", "-c"],

            task_id='test',

                    name='test'

            ).expand(arguments=create_cmds())

            *From:*Joe Auty <joea...@gmail.com
            <mailto:joea...@gmail.com>>
            *Sent:* Tuesday, June 14, 2022 10:03 AM
            *To:* users@airflow.apache.org
            <mailto:users@airflow.apache.org>
            *Subject:* Templated fields and dynamic task mapping

            Hello,

            I'm trying to understand the docs here:
            
https://airflow.apache.org/docs/apache-airflow/2.3.2/concepts/dynamic-task-mapping.html#how-do-templated-fields-and-mapped-arguments-interact
            , specifically this section:


                If you want to interpolate values either call
                task.render_template yourself, or use interpolation:


            In the example in the previous section of what doesn't
            work we have:

                printer.expand(val=make_list())


            What should the corrected version of this line be? IOW,
            how would I call make_list passing in the context so that
            I can send templated fields to my mapping function? Here
            is a more specific use case:


                KubernetesPodOperator.partial(
                        task_id="schema-dump-input",
                        namespace=NAMESPACE,
                        image=REGISTRY_URL + "/postgres-client:12",
                        name="pg-schemadump",
                        in_cluster=True,
                        hostnetwork=False,
                        max_active_tis_per_dag=1,
                        dag=dag
                    ).expand(
                        cmds=schema_dump_input_cmds(ds),
                        env_vars=schema_dump_input_env_vars(ds)
                    )

            In this example, ds has no value of course because it is
            not defined anywhere, and of course {{ ds }} doesn't work
            either (this doesn't get interpolated and is registered
            as literally "{{ ds }}"). How can I pass in a templated
            field, such as {{ ds }}?

            Thanks very much in advance!



                        This message contains information that may be
                        confidential. If you are not the intended
                        recipient, please delete it immediately and
                        notify us at i...@evogene.com
                        <mailto:i...@evogene.com>. Please note that
                        any disclosure or copying of its content is
                        forbidden



Reply via email to