From “admin” menu, chose “variable”, input your “key”and “val” values. In 
python scripts add
keyname = Variable.get("key"), use keyname in your dags


Richard Li
Big Data Engineer @ Product/Service Innovation Development
222 W. Merchandise Mart Plaza | Suite 850 | Chicago, Illinois 60654
6308632669


From: Ayush Goel <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Monday, July 6, 2020 at 12:36 PM
To: "[email protected]" <[email protected]>
Subject: [External] How to access external arguments when passing them using 
REST API in airflow?

Hi,


 I tried triggering a airflow dag using the curl: 
http://localhost:8080/api/experimental/dags/api_test/dag_runs<https://urldefense.com/v3/__http:/localhost:8080/api/experimental/dags/api_test/dag_runs__;!!IIU9BLNPZ2ob!bzFlFU4TxDdurM-OBcl6Fpj1f_AHAfkulbMVFtUPEX2sxn3leOZOmRvP6U6aSw$>
 -H 'Cache-Control: no-cache'
-H 'Content-Type: application/json'
-d '{"conf":"{"input_path":"value", "output_path":"value" }"}'

I am able to trigger the workflow successfully but now I want to access this 
input_path and output_path inside my PythonOperator code in the airflow dag.

Can someone help in this regard?

My Dag Code:

from datetime import timedelta

import logging



# The DAG object; we'll need this to instantiate a DAG

from airflow import DAG

# Operators; we need this to operate!

from airflow.operators.python_operator import PythonOperator

from airflow.utils.dates import days_ago



# These args will get passed on to each operator

# You can override them on a per-task basis during operator initialization

default_args = {

    'owner': 'someone',

    'depends_on_past': False,

    'start_date': days_ago(2),

    'email': ['[email protected]<mailto:[email protected]>'],

    'email_on_failure': False,

    'email_on_retry': False,

    'retries': 1,

    'retry_delay': timedelta(minutes=2)

}

dag = DAG(

    'api_test',

    default_args=default_args,

    description='testing rest api calls',

    schedule_interval=None,

)

def run_this_func(**kwargs):

    logging.info("Trying to print the logs")

    logging.info(kwargs['conf'])

    for key,value in kwargs['conf'].items():

        logging.info(key)

        logging.info(value)

    
#logging.info<https://urldefense.com/v3/__http:/logging.info__;!!IIU9BLNPZ2ob!bzFlFU4TxDdurM-OBcl6Fpj1f_AHAfkulbMVFtUPEX2sxn3leOZOmRtQNTdTJw$>(kwargs['dag_run'].conf['input_path'])

    
#logging.info<https://urldefense.com/v3/__http:/logging.info__;!!IIU9BLNPZ2ob!bzFlFU4TxDdurM-OBcl6Fpj1f_AHAfkulbMVFtUPEX2sxn3leOZOmRtQNTdTJw$>(kwargs['conf']['output_path'])



run_this = PythonOperator(

    task_id='run_this',

    python_callable=run_this_func,

    dag=dag,

    provide_context=True,

)



run_this

I tried printing the arguments by 
logging.info<https://urldefense.com/v3/__http:/logging.info__;!!IIU9BLNPZ2ob!bzFlFU4TxDdurM-OBcl6Fpj1f_AHAfkulbMVFtUPEX2sxn3leOZOmRtQNTdTJw$>(kwargs['dag_run'].conf['input_path'])
 
logging.info<https://urldefense.com/v3/__http:/logging.info__;!!IIU9BLNPZ2ob!bzFlFU4TxDdurM-OBcl6Fpj1f_AHAfkulbMVFtUPEX2sxn3leOZOmRtQNTdTJw$>(kwargs['conf']['output_path'])

But both of these lines are giving me errors. Is there any way of accessing the 
input_path and output_path?

Thanks and Regards,
Ayush Goel

Reply via email to