Turns out this issue is actually documented here:
https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html#how-do-templated-fields-and-mapped-arguments-interact

On Wed, Jun 15, 2022 at 12:33 PM Daniel Standish <
daniel.stand...@astronomer.io> wrote:

> Yeah it does seem that templating does not work with expanded params at
> the moment.  No promises but I would bet this will change at some point.
> Seems reasonable and I can't think of a technical problem.
>
> Tal's approach looks good.  I came up with basically the same thing when
> testing this out:
>
> @dag.task
> def gen_vars(ds=None):
>     return [{"MY_VAR": ds} for x in range(3)]
>
> op2 = BashOperator.partial(task_id='task_mapped', bash_command="echo 
> $MY_VAR",).expand(
>     env=gen_vars(),
> )
>
> One thing I noticed in your example was, it appears, maybe you are mapping
> from two upstream tasks, for two mapped arguments.  You may have noticed,
> this will result in a cartesian product (maybe this is desirable for
> you?).  This is an area we are actively working on, so that you can provide
> multiple kwargs to map from a single task.
>
>
>
> On Wed, Jun 15, 2022 at 3:36 AM Tal Nagar <tal.na...@evogene.com> wrote:
>
>> Hi Joe
>>
>> Not sure if it's the recommended way to do it, however what worked for me
>> is rendering the templates in a python task and not directly in the
>> KubernetesPodOperator.
>>
>> For example:
>>
>>
>>
>>     @task()
>>
>>     def create_cmds(**context):
>>
>>         run_params = context["params"]
>>
>>         size = int(run_params['size'])
>>
>>         arr = []
>>
>>         for s in range(int(size)):
>>
>>             arr.append([f'echo hi {s}'])
>>
>>         return arr
>>
>>
>>
>>     say_hi = KubernetesPodOperator.partial(
>>
>>         image="alpine",
>>
>>         cmds=["/bin/sh", "-c"],
>>
>>         task_id='test',
>>
>>         name='test'
>>
>>     ).expand(arguments=create_cmds())
>>
>>
>>
>> *From:* Joe Auty <joea...@gmail.com>
>> *Sent:* Tuesday, June 14, 2022 10:03 AM
>> *To:* users@airflow.apache.org
>> *Subject:* Templated fields and dynamic task mapping
>>
>>
>>
>> Hello,
>>
>> I'm trying to understand the docs here:
>> https://airflow.apache.org/docs/apache-airflow/2.3.2/concepts/dynamic-task-mapping.html#how-do-templated-fields-and-mapped-arguments-interact
>> , specifically this section:
>>
>>
>> If you want to interpolate values either call task.render_template yourself,
>> or use interpolation:
>>
>>
>> In the example in the previous section of what doesn't work we have:
>>
>> printer.expand(val=make_list())
>>
>>
>> What should the corrected version of this line be? IOW, how would I call
>> make_list passing in the context so that I can send templated fields to my
>> mapping function? Here is a more specific use case:
>>
>>
>> KubernetesPodOperator.partial(
>>         task_id="schema-dump-input",
>>         namespace=NAMESPACE,
>>         image=REGISTRY_URL + "/postgres-client:12",
>>         name="pg-schemadump",
>>         in_cluster=True,
>>         hostnetwork=False,
>>         max_active_tis_per_dag=1,
>>         dag=dag
>>     ).expand(
>>         cmds=schema_dump_input_cmds(ds),
>>         env_vars=schema_dump_input_env_vars(ds)
>>     )
>>
>> In this example, ds has no value of course because it is not defined
>> anywhere, and of course {{ ds }} doesn't work either (this doesn't get
>> interpolated and is registered as literally "{{ ds }}"). How can I pass in
>> a templated field, such as {{ ds }}?
>>
>> Thanks very much in advance!
>>
>>
>>
>> This message contains information that may be confidential. If you are
>> not the intended recipient, please delete it immediately and notify us at
>> i...@evogene.com. Please note that any disclosure or copying of its
>> content is forbidden
>>
>

Reply via email to