From “admin” menu, chose “variable”, input your “key”and “val” values. In
python scripts add
keyname = Variable.get("key"), use keyname in your dags
Richard Li
Big Data Engineer @ Product/Service Innovation Development
222 W. Merchandise Mart Plaza | Suite 850 | Chicago, Illinois 60654
6308632669
From: Ayush Goel <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Monday, July 6, 2020 at 12:36 PM
To: "[email protected]" <[email protected]>
Subject: [External] How to access external arguments when passing them using
REST API in airflow?
Hi,
I tried triggering a airflow dag using the curl:
http://localhost:8080/api/experimental/dags/api_test/dag_runs<https://urldefense.com/v3/__http:/localhost:8080/api/experimental/dags/api_test/dag_runs__;!!IIU9BLNPZ2ob!bzFlFU4TxDdurM-OBcl6Fpj1f_AHAfkulbMVFtUPEX2sxn3leOZOmRvP6U6aSw$>
-H 'Cache-Control: no-cache'
-H 'Content-Type: application/json'
-d '{"conf":"{"input_path":"value", "output_path":"value" }"}'
I am able to trigger the workflow successfully but now I want to access this
input_path and output_path inside my PythonOperator code in the airflow dag.
Can someone help in this regard?
My Dag Code:
from datetime import timedelta
import logging
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'someone',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['[email protected]<mailto:[email protected]>'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=2)
}
dag = DAG(
'api_test',
default_args=default_args,
description='testing rest api calls',
schedule_interval=None,
)
def run_this_func(**kwargs):
logging.info("Trying to print the logs")
logging.info(kwargs['conf'])
for key,value in kwargs['conf'].items():
logging.info(key)
logging.info(value)
#logging.info<https://urldefense.com/v3/__http:/logging.info__;!!IIU9BLNPZ2ob!bzFlFU4TxDdurM-OBcl6Fpj1f_AHAfkulbMVFtUPEX2sxn3leOZOmRtQNTdTJw$>(kwargs['dag_run'].conf['input_path'])
#logging.info<https://urldefense.com/v3/__http:/logging.info__;!!IIU9BLNPZ2ob!bzFlFU4TxDdurM-OBcl6Fpj1f_AHAfkulbMVFtUPEX2sxn3leOZOmRtQNTdTJw$>(kwargs['conf']['output_path'])
run_this = PythonOperator(
task_id='run_this',
python_callable=run_this_func,
dag=dag,
provide_context=True,
)
run_this
I tried printing the arguments by
logging.info<https://urldefense.com/v3/__http:/logging.info__;!!IIU9BLNPZ2ob!bzFlFU4TxDdurM-OBcl6Fpj1f_AHAfkulbMVFtUPEX2sxn3leOZOmRtQNTdTJw$>(kwargs['dag_run'].conf['input_path'])
logging.info<https://urldefense.com/v3/__http:/logging.info__;!!IIU9BLNPZ2ob!bzFlFU4TxDdurM-OBcl6Fpj1f_AHAfkulbMVFtUPEX2sxn3leOZOmRtQNTdTJw$>(kwargs['conf']['output_path'])
But both of these lines are giving me errors. Is there any way of accessing the
input_path and output_path?
Thanks and Regards,
Ayush Goel