It looks like the context is not passed through:

defschema_dump_input_cmds(**context):
print(context)

schema_dump= KubernetesPodOperator.partial(
task_id="schema-dump-input",
namespace=NAMESPACE,
image=REGISTRY_URL+ "/postgres-client:12-"+ AGENT_VERSION,
.. snipped
max_active_tis_per_dag=1,
dag=dag
).expand(
cmds=schema_dump_input_cmds(),
env_vars=schema_dump_input_env_vars()
)


I'm seeing an empty object in my logs. Any idea why, or any suggestions here?




Daniel Standish wrote on 2022-06-15 12:57 PM:
Turns out this issue is actually documented here: https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html#how-do-templated-fields-and-mapped-arguments-interact

On Wed, Jun 15, 2022 at 12:33 PM Daniel Standish <daniel.stand...@astronomer.io <mailto:daniel.stand...@astronomer.io>> wrote:

    Yeah it does seem that templating does not work with expanded
    params at the moment.  No promises but I would bet this will
    change at some point.  Seems reasonable and I can't think of a
    technical problem.

    Tal's approach looks good.  I came up with basically the same
    thing when testing this out:

    @dag.task def gen_vars(ds=None):
         return [{"MY_VAR": ds}for x in range(3)]

    op2 = BashOperator.partial(task_id='task_mapped', bash_command="echo 
$MY_VAR",).expand(
         env=gen_vars(), )

    One thing I noticed in your example was, it appears, maybe you are
    mapping from two upstream tasks, for two mapped arguments.  You
    may have noticed, this will result in a cartesian product (maybe
    this is desirable for you?).  This is an area we are actively
    working on, so that you can provide multiple kwargs to map from a
    single task.



    On Wed, Jun 15, 2022 at 3:36 AM Tal Nagar <tal.na...@evogene.com
    <mailto:tal.na...@evogene.com>> wrote:

        Hi Joe

        Not sure if it's the recommended way to do it, however what
        worked for me is rendering the templates in a python task and
        not directly in the KubernetesPodOperator.

        For example:

            @task()

            def create_cmds(**context):

                run_params = context["params"]

                size = int(run_params['size'])

                arr = []

                for s in range(int(size)):

        arr.append([f'echo hi {s}'])

                return arr

            say_hi = KubernetesPodOperator.partial(

        image="alpine",

        cmds=["/bin/sh", "-c"],

        task_id='test',

                name='test'

        ).expand(arguments=create_cmds())

        *From:*Joe Auty <joea...@gmail.com <mailto:joea...@gmail.com>>
        *Sent:* Tuesday, June 14, 2022 10:03 AM
        *To:* users@airflow.apache.org <mailto:users@airflow.apache.org>
        *Subject:* Templated fields and dynamic task mapping

        Hello,

        I'm trying to understand the docs here:
        
https://airflow.apache.org/docs/apache-airflow/2.3.2/concepts/dynamic-task-mapping.html#how-do-templated-fields-and-mapped-arguments-interact
        , specifically this section:


            If you want to interpolate values either call
            task.render_template yourself, or use interpolation:


        In the example in the previous section of what doesn't work we
        have:

            printer.expand(val=make_list())


        What should the corrected version of this line be? IOW, how
        would I call make_list passing in the context so that I can
        send templated fields to my mapping function? Here is a more
        specific use case:


            KubernetesPodOperator.partial(
                    task_id="schema-dump-input",
                    namespace=NAMESPACE,
                    image=REGISTRY_URL + "/postgres-client:12",
                    name="pg-schemadump",
                    in_cluster=True,
                    hostnetwork=False,
                    max_active_tis_per_dag=1,
                    dag=dag
                ).expand(
                    cmds=schema_dump_input_cmds(ds),
                    env_vars=schema_dump_input_env_vars(ds)
                )

        In this example, ds has no value of course because it is not
        defined anywhere, and of course {{ ds }} doesn't work either
        (this doesn't get interpolated and is registered as literally
        "{{ ds }}"). How can I pass in a templated field, such as {{
        ds }}?

        Thanks very much in advance!



                    This message contains information that may be
                    confidential. If you are not the intended
                    recipient, please delete it immediately and notify
                    us at i...@evogene.com <mailto:i...@evogene.com>.
                    Please note that any disclosure or copying of its
                    content is forbidden


Reply via email to