It looks like the context is not passed through:
defschema_dump_input_cmds(**context):
print(context)
schema_dump= KubernetesPodOperator.partial(
task_id="schema-dump-input",
namespace=NAMESPACE,
image=REGISTRY_URL+ "/postgres-client:12-"+ AGENT_VERSION,
.. snipped
max_active_tis_per_dag=1,
dag=dag
).expand(
cmds=schema_dump_input_cmds(),
env_vars=schema_dump_input_env_vars()
)
I'm seeing an empty object in my logs. Any idea why, or any suggestions
here?
Daniel Standish wrote on 2022-06-15 12:57 PM:
Turns out this issue is actually documented here:
https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html#how-do-templated-fields-and-mapped-arguments-interact
On Wed, Jun 15, 2022 at 12:33 PM Daniel Standish
<daniel.stand...@astronomer.io <mailto:daniel.stand...@astronomer.io>>
wrote:
Yeah it does seem that templating does not work with expanded
params at the moment. No promises but I would bet this will
change at some point. Seems reasonable and I can't think of a
technical problem.
Tal's approach looks good. I came up with basically the same
thing when testing this out:
@dag.task def gen_vars(ds=None):
return [{"MY_VAR": ds}for x in range(3)]
op2 = BashOperator.partial(task_id='task_mapped', bash_command="echo
$MY_VAR",).expand(
env=gen_vars(), )
One thing I noticed in your example was, it appears, maybe you are
mapping from two upstream tasks, for two mapped arguments. You
may have noticed, this will result in a cartesian product (maybe
this is desirable for you?). This is an area we are actively
working on, so that you can provide multiple kwargs to map from a
single task.
On Wed, Jun 15, 2022 at 3:36 AM Tal Nagar <tal.na...@evogene.com
<mailto:tal.na...@evogene.com>> wrote:
Hi Joe
Not sure if it's the recommended way to do it, however what
worked for me is rendering the templates in a python task and
not directly in the KubernetesPodOperator.
For example:
@task()
def create_cmds(**context):
run_params = context["params"]
size = int(run_params['size'])
arr = []
for s in range(int(size)):
arr.append([f'echo hi {s}'])
return arr
say_hi = KubernetesPodOperator.partial(
image="alpine",
cmds=["/bin/sh", "-c"],
task_id='test',
name='test'
).expand(arguments=create_cmds())
*From:*Joe Auty <joea...@gmail.com <mailto:joea...@gmail.com>>
*Sent:* Tuesday, June 14, 2022 10:03 AM
*To:* users@airflow.apache.org <mailto:users@airflow.apache.org>
*Subject:* Templated fields and dynamic task mapping
Hello,
I'm trying to understand the docs here:
https://airflow.apache.org/docs/apache-airflow/2.3.2/concepts/dynamic-task-mapping.html#how-do-templated-fields-and-mapped-arguments-interact
, specifically this section:
If you want to interpolate values either call
task.render_template yourself, or use interpolation:
In the example in the previous section of what doesn't work we
have:
printer.expand(val=make_list())
What should the corrected version of this line be? IOW, how
would I call make_list passing in the context so that I can
send templated fields to my mapping function? Here is a more
specific use case:
KubernetesPodOperator.partial(
task_id="schema-dump-input",
namespace=NAMESPACE,
image=REGISTRY_URL + "/postgres-client:12",
name="pg-schemadump",
in_cluster=True,
hostnetwork=False,
max_active_tis_per_dag=1,
dag=dag
).expand(
cmds=schema_dump_input_cmds(ds),
env_vars=schema_dump_input_env_vars(ds)
)
In this example, ds has no value of course because it is not
defined anywhere, and of course {{ ds }} doesn't work either
(this doesn't get interpolated and is registered as literally
"{{ ds }}"). How can I pass in a templated field, such as {{
ds }}?
Thanks very much in advance!
This message contains information that may be
confidential. If you are not the intended
recipient, please delete it immediately and notify
us at i...@evogene.com <mailto:i...@evogene.com>.
Please note that any disclosure or copying of its
content is forbidden