[ https://issues.apache.org/jira/browse/AIRFLOW-392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Daniel Imberman closed AIRFLOW-392. ----------------------------------- Resolution: Auto Closed > DAG runs on strange schedule in the past when deployed > ------------------------------------------------------ > > Key: AIRFLOW-392 > URL: https://issues.apache.org/jira/browse/AIRFLOW-392 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler > Affects Versions: 1.7.1.3 > Environment: AWS ElasticBeanstalk as a Docker application running in > an Ubuntu-based container > Reporter: David Klosowski > Assignee: Norman Mu > Priority: Major > > Just deployed a new DAG ('weekly-no-track') that depends on 7 DAG task runs > of another DAG ('daily-no-track'). When the DAG is deployed the scheduler > schedules and runs multiple runs in the past (yesterday it ran for 6/12/2016 > and 6/05/2016), despite the start date set to the deployment date. > It would be a bit difficult to include all the code being used in the DAG > since we have multiple libraries we've built in Python that are being > referenced here that we want to eventually open source. I've included some > of the code here. Let me know if this is all clear and what I can do to help > or if any insight can be provided as to what it is occurring and how we might > fix this. > {code} > from __future__ import division, print_function > from airflow.models import DAG > from airflow.operators import DummyOperator, ExternalTaskSensor, > TimeDeltaSensor > from tn_etl_tools.aws.emr import EmrConfig, HiveConfig, read_cluster_templates > from tn_etl_tools.aws.emr import EmrService, EmrServiceWrapper, > HiveStepBuilder > from tn_etl_tools.datesupport import ts_add > from tn_etl_tools.hive import HivePartitions > from tn_etl_tools.yaml import YamlLoader > from datetime import datetime, timedelta > from dateutil.relativedelta import relativedelta, SU, MO , TU, WE, TH, FR, SA > from common_args import merge_dicts, CommonHiveParams > from operator_builders import add_tasks, emr_hive_operator > import os > # === configs > config_dir = os.getenv('DAG_CONFIG_DIR', '/usr/local/airflow/config') > alert_email = os.getenv('AIRFLOW_TO_EMAIL') > app_properties = YamlLoader.load_yaml(config_dir + '/app.yml') > emr_cluster_properties = YamlLoader.load_yaml(config_dir + > '/emr_clusters.yml') > emr_config = EmrConfig.load(STAGE=app_properties['STAGE'], > **app_properties['EMR']) > hive_config = HiveConfig.load(STAGE=app_properties['STAGE'], > **app_properties['HIVE']) > emr_cluster_templates = read_cluster_templates(emr_cluster_properties) > # === /configs > # TODO: force execution_date = sunday? > run_for_date = datetime(2016, 8, 8) > emr_service = EmrService() > emr_service_wrapper = EmrServiceWrapper(emr_service=emr_service, > emr_config=emr_config, > cluster_templates=emr_cluster_templates) > hive_step_builder = HiveStepBuilder(hive_config=hive_config) > hive_params = CommonHiveParams(app_properties_hive=app_properties['HIVE']) > args = {'owner': 'airflow', > 'depends_on_past': False, > 'start_date': run_for_date, > 'email': [alert_email], > 'email_on_failure': True, > 'email_on_retry': False, > 'retries': 1, > 'trigger_rule' : 'all_success', > 'emr_service_wrapper': emr_service_wrapper, > 'hive_step_builder': hive_step_builder} > user_defined_macros = {'hive_partitions': HivePartitions, > 'ts_add': ts_add} > params = {'stage': app_properties['STAGE']} > dag = DAG(dag_id='weekly_no_track', default_args=args, > user_defined_macros=user_defined_macros, params=params, > schedule_interval=timedelta(days=7), > max_active_runs=1) > # === task definitions > task_definitions = { > 'wait-for-dailies': { > 'operator_type': 'dummy_operator', # hub for custom defined > dependencies > 'operator_args': {}, > 'depends_on': [] > }, > 'weekly-no-track': { > 'operator_type': 'emr_hive_operator', > 'operator_args': { > 'hive_step': { > 'script': 'weekly-no-track-airflow', # temporary modified > script with separate output path > 'cluster_name': 'geoprofile', > 'script_vars': merge_dicts(hive_params.default_params(), > hive_params.rundate_params(), { > 'PARTITIONS': '{{hive_partitions.by_day(ts_add(ts, > days=-6), ts_add(ts, days=1))}}', > }), > } > }, > 'depends_on': ['wait-for-dailies'] > } > } > # === /task definitions > operator_builders = {'emr_hive_operator': emr_hive_operator, > 'time_delta_sensor': TimeDeltaSensor, > 'dummy_operator': DummyOperator} > add_tasks(task_definitions, dag=dag, operator_builders=operator_builders) > # === custom tasks > downstream_task = dag.get_task('wait-for-dailies') > for weekday in [MO, TU, WE, TH, FR, SA, SU]: > task_id = 'wait-for-daily-{day}'.format(day=weekday) > # weekday(-1) subtracts 1 relative week from the given weekday, however > if the calculated date is already Monday, > # for example, -1 won't change the day. > delta = relativedelta(weekday=weekday(-1)) > sensor = ExternalTaskSensor(task_id=task_id, dag=dag, > external_dag_id='daily_no_track', > external_task_id='daily-no-track', > execution_delta=delta, timeout=86400) # > 86400 = 24 hours > sensor.set_downstream(downstream_task) > # === /custom tasks > {code} > Some referenced code > {{common_args.py}} > {code} > from __future__ import division, print_function > from copy import copy > class CommonHiveParams(object): > def __init__(self, app_properties_hive): > super(CommonHiveParams, self).__init__() > # TODO: this should be part of a config object > self.app_properties_hive = app_properties_hive > def default_params(self): > return { > 'HIVE_LIBS_BUCKET': > self.app_properties_hive['S3_HIVE_LIB_BUCKET'], > 'STAGE': '{{params.stage}}', > } > @staticmethod > def rundate_params(): > return { > 'YEAR': '{{execution_date.strftime("%Y")}}', > 'MONTH': '{{execution_date.strftime("%m")}}', > 'DAY': '{{execution_date.strftime("%d")}}', > 'HOUR': '{{execution_date.strftime("%H")}}', > 'MINUTE': '{{execution_date.strftime("%M")}}', > } > def merge_dicts(*dicts): > """ Merge provided dicts without modification. > Duplicate keys are overwritten with values from the rightmost applicable > dict. > """ > if len(dicts) == 0: > return {} > result = copy(dicts[0]) > for d in dicts[1:]: > result.update(d) > return result > {code} > {{operator_builders.py}} > {code} > """Functions for building operators from dict property definitions.""" > from __future__ import division, print_function > from tn_airflow_components.operators.emr import EmrHiveOperator, > create_emr_operator_with_step_sensor > # TODO: this should not be a single package. Not every DAG needs EMR as a > dependency, for example. > def emr_hive_operator(task_id, dag, hive_step, **kwargs): > return create_emr_operator_with_step_sensor(task_id=task_id, dag=dag, > > main_operator_class=EmrHiveOperator, main_operator_kwargs=hive_step, > **kwargs) > def add_tasks(task_definitions, dag, operator_builders): > """Add tasks from dict definitions > :param task_definitions: dict of task definitions. Keys in the top-level > dict are used as the task IDs > :type task_definitions: dict > :param dag: the DAG in which to define the tasks > :type dag: airflow.models.DAG > :param operator_builders: mapping of str 'operator_type' values to > operator builder functions > :type operator_builders: dict > """ > for task_id in task_definitions.keys(): > task_definition = task_definitions[task_id] > operator_type = task_definition['operator_type'] > operator = operator_builders[operator_type](task_id=task_id, dag=dag, > **task_definition['operator_args']) > if task_definition['depends_on']: > for dependency in task_definition['depends_on']: > operator.set_upstream(dag.get_task(dependency)) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)