[ 
https://issues.apache.org/jira/browse/AIRFLOW-392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Imberman closed AIRFLOW-392.
-----------------------------------
    Resolution: Auto Closed

> DAG runs on strange schedule in the past when deployed
> ------------------------------------------------------
>
>                 Key: AIRFLOW-392
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-392
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: scheduler
>    Affects Versions: 1.7.1.3
>         Environment: AWS ElasticBeanstalk as a Docker application running in 
> an Ubuntu-based container
>            Reporter: David Klosowski
>            Assignee: Norman Mu
>            Priority: Major
>
> Just deployed a new DAG ('weekly-no-track') that depends on 7 DAG task runs 
> of another DAG ('daily-no-track').  When the DAG is deployed the scheduler 
> schedules and runs multiple runs in the past (yesterday it ran for 6/12/2016 
> and 6/05/2016), despite the start date set to the deployment date.  
> It would be a bit difficult to include all the code being used in the DAG 
> since we have multiple libraries we've built in Python that are being 
> referenced here that we want to eventually open source.  I've included some 
> of the code here.  Let me know if this is all clear and what I can do to help 
> or if any insight can be provided as to what it is occurring and how we might 
> fix this.
> {code}
> from __future__ import division, print_function
> from airflow.models import DAG
> from airflow.operators import DummyOperator, ExternalTaskSensor, 
> TimeDeltaSensor
> from tn_etl_tools.aws.emr import EmrConfig, HiveConfig, read_cluster_templates
> from tn_etl_tools.aws.emr import EmrService, EmrServiceWrapper, 
> HiveStepBuilder
> from tn_etl_tools.datesupport import ts_add
> from tn_etl_tools.hive import HivePartitions
> from tn_etl_tools.yaml import YamlLoader
> from datetime import datetime, timedelta
> from dateutil.relativedelta import relativedelta, SU, MO , TU, WE, TH, FR, SA
> from common_args import merge_dicts, CommonHiveParams
> from operator_builders import add_tasks, emr_hive_operator
> import os
> # === configs
> config_dir = os.getenv('DAG_CONFIG_DIR', '/usr/local/airflow/config')
> alert_email = os.getenv('AIRFLOW_TO_EMAIL')
> app_properties = YamlLoader.load_yaml(config_dir + '/app.yml')
> emr_cluster_properties = YamlLoader.load_yaml(config_dir + 
> '/emr_clusters.yml')
> emr_config = EmrConfig.load(STAGE=app_properties['STAGE'], 
> **app_properties['EMR'])
> hive_config = HiveConfig.load(STAGE=app_properties['STAGE'], 
> **app_properties['HIVE'])
> emr_cluster_templates = read_cluster_templates(emr_cluster_properties)
> # === /configs
> # TODO: force execution_date = sunday?
> run_for_date = datetime(2016, 8, 8)
> emr_service = EmrService()
> emr_service_wrapper = EmrServiceWrapper(emr_service=emr_service,
>                                         emr_config=emr_config, 
> cluster_templates=emr_cluster_templates)
> hive_step_builder = HiveStepBuilder(hive_config=hive_config)
> hive_params = CommonHiveParams(app_properties_hive=app_properties['HIVE'])
> args = {'owner': 'airflow',
>         'depends_on_past': False,
>         'start_date': run_for_date,
>         'email': [alert_email],
>         'email_on_failure': True,
>         'email_on_retry': False,
>         'retries': 1,
>         'trigger_rule' : 'all_success',
>         'emr_service_wrapper': emr_service_wrapper,
>         'hive_step_builder': hive_step_builder}
> user_defined_macros = {'hive_partitions': HivePartitions,
>                        'ts_add': ts_add}
> params = {'stage': app_properties['STAGE']}
> dag = DAG(dag_id='weekly_no_track', default_args=args, 
> user_defined_macros=user_defined_macros, params=params,
>           schedule_interval=timedelta(days=7),
>           max_active_runs=1)
> # === task definitions
> task_definitions = {
>     'wait-for-dailies': {
>         'operator_type': 'dummy_operator', # hub for custom defined 
> dependencies
>         'operator_args': {},
>         'depends_on': []
>     },
>     'weekly-no-track': {
>         'operator_type': 'emr_hive_operator',
>         'operator_args': {
>             'hive_step': {
>                 'script': 'weekly-no-track-airflow',  # temporary modified 
> script with separate output path
>                 'cluster_name': 'geoprofile',
>                 'script_vars': merge_dicts(hive_params.default_params(), 
> hive_params.rundate_params(), {
>                     'PARTITIONS': '{{hive_partitions.by_day(ts_add(ts, 
> days=-6), ts_add(ts, days=1))}}',
>                 }),
>             }
>         },
>         'depends_on': ['wait-for-dailies']
>     }
> }
> # === /task definitions
> operator_builders = {'emr_hive_operator': emr_hive_operator,
>                      'time_delta_sensor': TimeDeltaSensor,
>                      'dummy_operator': DummyOperator}
> add_tasks(task_definitions, dag=dag, operator_builders=operator_builders)
> # === custom tasks
> downstream_task = dag.get_task('wait-for-dailies')
> for weekday in [MO, TU, WE, TH, FR, SA, SU]:
>     task_id = 'wait-for-daily-{day}'.format(day=weekday)
>     # weekday(-1) subtracts 1 relative week from the given weekday, however 
> if the calculated date is already Monday,
>     # for example, -1 won't change the day.
>     delta = relativedelta(weekday=weekday(-1))
>     sensor = ExternalTaskSensor(task_id=task_id, dag=dag,
>                                 external_dag_id='daily_no_track', 
> external_task_id='daily-no-track',
>                                 execution_delta=delta, timeout=86400)  # 
> 86400 = 24 hours
>     sensor.set_downstream(downstream_task)
> # === /custom tasks
> {code}
> Some referenced code
> {{common_args.py}}
> {code}
> from __future__ import division, print_function
> from copy import copy
> class CommonHiveParams(object):
>     def __init__(self, app_properties_hive):
>         super(CommonHiveParams, self).__init__()
>         # TODO: this should be part of a config object
>         self.app_properties_hive = app_properties_hive
>     def default_params(self):
>         return {
>             'HIVE_LIBS_BUCKET': 
> self.app_properties_hive['S3_HIVE_LIB_BUCKET'],
>             'STAGE': '{{params.stage}}',
>         }
>     @staticmethod
>     def rundate_params():
>         return {
>             'YEAR': '{{execution_date.strftime("%Y")}}',
>             'MONTH': '{{execution_date.strftime("%m")}}',
>             'DAY': '{{execution_date.strftime("%d")}}',
>             'HOUR': '{{execution_date.strftime("%H")}}',
>             'MINUTE': '{{execution_date.strftime("%M")}}',
>         }
> def merge_dicts(*dicts):
>     """ Merge provided dicts without modification.
>     Duplicate keys are overwritten with values from the rightmost applicable 
> dict.
>     """
>     if len(dicts) == 0:
>         return {}
>     result = copy(dicts[0])
>     for d in dicts[1:]:
>         result.update(d)
>     return result
> {code}
> {{operator_builders.py}}
> {code}
> """Functions for building operators from dict property definitions."""
> from __future__ import division, print_function
> from tn_airflow_components.operators.emr import EmrHiveOperator, 
> create_emr_operator_with_step_sensor
> # TODO: this should not be a single package. Not every DAG needs EMR as a 
> dependency, for example.
> def emr_hive_operator(task_id, dag, hive_step, **kwargs):
>     return create_emr_operator_with_step_sensor(task_id=task_id, dag=dag,
>                                                 
> main_operator_class=EmrHiveOperator, main_operator_kwargs=hive_step,
>                                                 **kwargs)
> def add_tasks(task_definitions, dag, operator_builders):
>     """Add tasks from dict definitions
>     :param task_definitions: dict of task definitions. Keys in the top-level 
> dict are used as the task IDs
>     :type task_definitions: dict
>     :param dag: the DAG in which to define the tasks
>     :type dag: airflow.models.DAG
>     :param operator_builders: mapping of str 'operator_type' values to 
> operator builder functions
>     :type operator_builders: dict
>     """
>     for task_id in task_definitions.keys():
>         task_definition = task_definitions[task_id]
>         operator_type = task_definition['operator_type']
>         operator = operator_builders[operator_type](task_id=task_id, dag=dag, 
> **task_definition['operator_args'])
>         if task_definition['depends_on']:
>             for dependency in task_definition['depends_on']:
>                 operator.set_upstream(dag.get_task(dependency))
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to