Repository: incubator-airflow Updated Branches: refs/heads/master fdb7e9491 -> eb989dcb5
[AIRFLOW-383] Cleanup example qubole operator dag Closes #1698 from yogesh2021/AIRFLOW-383 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/eb989dcb Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/eb989dcb Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/eb989dcb Branch: refs/heads/master Commit: eb989dcb54387c218683de9dff0a12950dce147b Parents: fdb7e94 Author: Yogesh Garg <yoge...@qubole.com> Authored: Mon Aug 1 07:49:54 2016 -0700 Committer: Chris Riccomini <chr...@wepay.com> Committed: Mon Aug 1 07:49:54 2016 -0700 ---------------------------------------------------------------------- .../example_dags/example_qubole_operator.py | 23 +++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb989dcb/airflow/contrib/example_dags/example_qubole_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/example_dags/example_qubole_operator.py b/airflow/contrib/example_dags/example_qubole_operator.py index 63cccd3..ff363e6 100644 --- a/airflow/contrib/example_dags/example_qubole_operator.py +++ b/airflow/contrib/example_dags/example_qubole_operator.py @@ -32,7 +32,9 @@ default_args = { 'email_on_retry': False } -dag = DAG('example_qubole_operator', default_args=default_args) +# NOTE:: This is only an example DAG to highlight usage of QuboleOperator in various scenarios, +# some of the tasks may or may not work based on your QDS account setup +dag = DAG('example_qubole_operator', default_args=default_args, schedule_interval='@daily') def compare_result(ds, **kwargs): ti = kwargs['ti'] @@ -45,14 +47,15 @@ t1 = QuboleOperator( command_type='hivecmd', query='show tables', cluster_label='default', - fetch_logs=True, - tags='aiflow_example_run', + fetch_logs=True, # If true, will fetch qubole command logs and concatenate them into corresponding airflow task logs + tags='aiflow_example_run', # To attach tags to qubole command, auto attach 3 tags - dag_id, task_id, run_id + qubole_conn_id='qubole_default', # Connection id to submit commands inside QDS, if not set "qubole_default" is used dag=dag) t2 = QuboleOperator( task_id='hive_s3_location', command_type="hivecmd", - script_location="s3n://dev.canopydata.com/airflow/show_table.hql", + script_location="s3n://public-qubole/qbol-library/scripts/show_table.hql", notfiy=True, tags=['tag1', 'tag2'], trigger_rule="all_done", @@ -76,14 +79,12 @@ branching = BranchPythonOperator( dag=dag) branching.set_upstream(t3) - join = DummyOperator( task_id='join', trigger_rule='one_success', dag=dag ) - t4 = QuboleOperator( task_id='hadoop_jar_cmd', command_type='hadoopcmd', @@ -95,7 +96,7 @@ t4 = QuboleOperator( t5 = QuboleOperator( task_id='pig_cmd', command_type="pigcmd", - script_location="s3://paid-qubole/PigAPIDemo/scripts/script1-hadoop-s3-small.pig", + script_location="s3://public-qubole/qbol-library/scripts/script1-hadoop-s3-small.pig", parameters="key1=value1 key2=value2", trigger_rule="all_done", dag=dag) @@ -104,7 +105,6 @@ t4.set_upstream(branching) t5.set_upstream(t4) t5.set_downstream(join) - t6 = QuboleOperator( task_id='presto_cmd', command_type='prestocmd', @@ -114,7 +114,7 @@ t6 = QuboleOperator( t7 = QuboleOperator( task_id='shell_cmd', command_type="shellcmd", - script_location="s3://paid-qubole/ShellDemo/data/excite-small.sh", + script_location="s3://public-qubole/qbol-library/scripts/shellx.sh", parameters="param1 param2", trigger_rule="all_done", dag=dag) @@ -123,7 +123,6 @@ t6.set_upstream(branching) t7.set_upstream(t6) t7.set_downstream(join) - t8 = QuboleOperator( task_id='db_query', command_type='dbtapquerycmd', @@ -146,7 +145,6 @@ t8.set_upstream(branching) t9.set_upstream(t8) t9.set_downstream(join) - t10 = QuboleOperator( task_id='db_import', command_type='dbimportcmd', @@ -186,7 +184,7 @@ t11 = QuboleOperator( task_id='spark_cmd', command_type="sparkcmd", program=prog, - language='python', + language='scala', arguments='--class SparkPi', tags='aiflow_example_run', dag=dag) @@ -194,4 +192,3 @@ t11 = QuboleOperator( t11.set_upstream(branching) t11.set_downstream(t10) t10.set_downstream(join) -