it doesn't look like you've made schema_dump_input_cmds a task... maybe
provide the full dag

Accessing context variables in taskflow api is documented here:
https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html#accessing-context-variables-in-decorated-tasks

On Wed, Jun 15, 2022 at 5:07 PM Joe Auty <joea...@gmail.com> wrote:

> It looks like the context is not passed through:
>
> def schema_dump_input_cmds(**context):
> print(context)
>
> schema_dump = KubernetesPodOperator.partial(
> task_id="schema-dump-input",
> namespace=NAMESPACE,
> image=REGISTRY_URL + "/postgres-client:12-" + AGENT_VERSION,
> .. snipped
> max_active_tis_per_dag=1,
> dag=dag
> ).expand(
> cmds=schema_dump_input_cmds(),
> env_vars=schema_dump_input_env_vars()
> )
>
>
> I'm seeing an empty object in my logs. Any idea why, or any suggestions
> here?
>
>
>
>
> Daniel Standish wrote on 2022-06-15 12:57 PM:
>
> Turns out this issue is actually documented here:
> https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html#how-do-templated-fields-and-mapped-arguments-interact
>
> On Wed, Jun 15, 2022 at 12:33 PM Daniel Standish <
> daniel.stand...@astronomer.io> wrote:
>
>> Yeah it does seem that templating does not work with expanded params at
>> the moment.  No promises but I would bet this will change at some point.
>> Seems reasonable and I can't think of a technical problem.
>>
>> Tal's approach looks good.  I came up with basically the same thing when
>> testing this out:
>>
>> @dag.taskdef gen_vars(ds=None):
>>     return [{"MY_VAR": ds} for x in range(3)]
>>
>> op2 = BashOperator.partial(task_id='task_mapped', bash_command="echo 
>> $MY_VAR",).expand(
>>     env=gen_vars(),)
>>
>> One thing I noticed in your example was, it appears, maybe you are
>> mapping from two upstream tasks, for two mapped arguments.  You may have
>> noticed, this will result in a cartesian product (maybe this is desirable
>> for you?).  This is an area we are actively working on, so that you can
>> provide multiple kwargs to map from a single task.
>>
>>
>>
>> On Wed, Jun 15, 2022 at 3:36 AM Tal Nagar <tal.na...@evogene.com> wrote:
>>
>>> Hi Joe
>>>
>>> Not sure if it's the recommended way to do it, however what worked for
>>> me is rendering the templates in a python task and not directly in the
>>> KubernetesPodOperator.
>>>
>>> For example:
>>>
>>>
>>>
>>>     @task()
>>>
>>>     def create_cmds(**context):
>>>
>>>         run_params = context["params"]
>>>
>>>         size = int(run_params['size'])
>>>
>>>         arr = []
>>>
>>>         for s in range(int(size)):
>>>
>>>             arr.append([f'echo hi {s}'])
>>>
>>>         return arr
>>>
>>>
>>>
>>>     say_hi = KubernetesPodOperator.partial(
>>>
>>>         image="alpine",
>>>
>>>         cmds=["/bin/sh", "-c"],
>>>
>>>         task_id='test',
>>>
>>>         name='test'
>>>
>>>     ).expand(arguments=create_cmds())
>>>
>>>
>>>
>>> *From:* Joe Auty <joea...@gmail.com>
>>> *Sent:* Tuesday, June 14, 2022 10:03 AM
>>> *To:* users@airflow.apache.org
>>> *Subject:* Templated fields and dynamic task mapping
>>>
>>>
>>>
>>> Hello,
>>>
>>> I'm trying to understand the docs here:
>>> https://airflow.apache.org/docs/apache-airflow/2.3.2/concepts/dynamic-task-mapping.html#how-do-templated-fields-and-mapped-arguments-interact
>>> , specifically this section:
>>>
>>>
>>> If you want to interpolate values either call task.render_template yourself,
>>> or use interpolation:
>>>
>>>
>>> In the example in the previous section of what doesn't work we have:
>>>
>>> printer.expand(val=make_list())
>>>
>>>
>>> What should the corrected version of this line be? IOW, how would I call
>>> make_list passing in the context so that I can send templated fields to my
>>> mapping function? Here is a more specific use case:
>>>
>>>
>>> KubernetesPodOperator.partial(
>>>         task_id="schema-dump-input",
>>>         namespace=NAMESPACE,
>>>         image=REGISTRY_URL + "/postgres-client:12",
>>>         name="pg-schemadump",
>>>         in_cluster=True,
>>>         hostnetwork=False,
>>>         max_active_tis_per_dag=1,
>>>         dag=dag
>>>     ).expand(
>>>         cmds=schema_dump_input_cmds(ds),
>>>         env_vars=schema_dump_input_env_vars(ds)
>>>     )
>>>
>>> In this example, ds has no value of course because it is not defined
>>> anywhere, and of course {{ ds }} doesn't work either (this doesn't get
>>> interpolated and is registered as literally "{{ ds }}"). How can I pass in
>>> a templated field, such as {{ ds }}?
>>>
>>> Thanks very much in advance!
>>>
>>>
>>>
>>> This message contains information that may be confidential. If you are
>>> not the intended recipient, please delete it immediately and notify us at
>>> i...@evogene.com. Please note that any disclosure or copying of its
>>> content is forbidden
>>>
>>
>

Reply via email to