Repository: incubator-airflow
Updated Branches:
  refs/heads/master 937142d29 -> 7fa86f72c


[AIRFLOW-673] Add operational metrics test for SchedulerJob

Extend SchedulerJob to instrument the execution
performance of task instances contained in each
DAG.
We want to know if any DAG is starved of resources,
and this will be reflected in the stats printed
out at the end of the test run.

Extend SchedulerJob to instrument the execution
performance of task instances contained in each
DAG. We want to know if any DAG is starved of
resources, and this will be reflected in the stats
printed out at the end of the test run.

this test is for instrumenting
the operational impact of
https://github.com/apache/incubator-
airflow/pull/1906

Closes #1919 from vijaysbhat/scheduler_perf_tool


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7fa86f72
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7fa86f72
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7fa86f72

Branch: refs/heads/master
Commit: 7fa86f72c6b6892bdc0621179cc01d67b2b59001
Parents: 937142d
Author: Vijay Bhat <vijaysb...@gmail.com>
Authored: Tue Jan 3 08:13:04 2017 -0500
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Tue Jan 3 08:13:06 2017 -0500

----------------------------------------------------------------------
 scripts/perf/dags/perf_dag_1.py       |  45 +++++++
 scripts/perf/dags/perf_dag_2.py       |  45 +++++++
 scripts/perf/scheduler_ops_metrics.py | 187 +++++++++++++++++++++++++++++
 3 files changed, 277 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7fa86f72/scripts/perf/dags/perf_dag_1.py
----------------------------------------------------------------------
diff --git a/scripts/perf/dags/perf_dag_1.py b/scripts/perf/dags/perf_dag_1.py
new file mode 100644
index 0000000..d97c830
--- /dev/null
+++ b/scripts/perf/dags/perf_dag_1.py
@@ -0,0 +1,45 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from airflow.operators.bash_operator import BashOperator
+from airflow.models import DAG
+from datetime import datetime, timedelta
+
+five_days_ago = datetime.combine(datetime.today() - timedelta(5),
+                                 datetime.min.time())
+args = {
+    'owner': 'airflow',
+    'start_date': five_days_ago,
+}
+
+dag = DAG(
+    dag_id='perf_dag_1', default_args=args,
+    schedule_interval='@daily',
+    dagrun_timeout=timedelta(minutes=60))
+
+task_1 = BashOperator(
+    task_id='perf_task_1',
+    bash_command='sleep 5; echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
+    dag=dag)
+
+for i in range(2, 5):
+    task = BashOperator(
+        task_id='perf_task_{}'.format(i),
+        bash_command='''
+            sleep 5; echo "run_id={{ run_id }} | dag_run={{ dag_run }}"
+        ''',
+        dag=dag)
+    task.set_upstream(task_1)
+
+if __name__ == "__main__":
+    dag.cli()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7fa86f72/scripts/perf/dags/perf_dag_2.py
----------------------------------------------------------------------
diff --git a/scripts/perf/dags/perf_dag_2.py b/scripts/perf/dags/perf_dag_2.py
new file mode 100644
index 0000000..cccd547
--- /dev/null
+++ b/scripts/perf/dags/perf_dag_2.py
@@ -0,0 +1,45 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from airflow.operators.bash_operator import BashOperator
+from airflow.models import DAG
+from datetime import datetime, timedelta
+
+five_days_ago = datetime.combine(datetime.today() - timedelta(5),
+                                 datetime.min.time())
+args = {
+    'owner': 'airflow',
+    'start_date': five_days_ago,
+}
+
+dag = DAG(
+    dag_id='perf_dag_2', default_args=args,
+    schedule_interval='@daily',
+    dagrun_timeout=timedelta(minutes=60))
+
+task_1 = BashOperator(
+    task_id='perf_task_1',
+    bash_command='sleep 5; echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
+    dag=dag)
+
+for i in range(2, 5):
+    task = BashOperator(
+        task_id='perf_task_{}'.format(i),
+        bash_command='''
+            sleep 5; echo "run_id={{ run_id }} | dag_run={{ dag_run }}"
+        ''',
+        dag=dag)
+    task.set_upstream(task_1)
+
+if __name__ == "__main__":
+    dag.cli()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7fa86f72/scripts/perf/scheduler_ops_metrics.py
----------------------------------------------------------------------
diff --git a/scripts/perf/scheduler_ops_metrics.py 
b/scripts/perf/scheduler_ops_metrics.py
new file mode 100644
index 0000000..40e1b36
--- /dev/null
+++ b/scripts/perf/scheduler_ops_metrics.py
@@ -0,0 +1,187 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import datetime
+import logging
+import pandas as pd
+import sys
+
+from airflow import configuration, settings
+from airflow.jobs import SchedulerJob
+from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.utils.state import State
+
+SUBDIR = 'scripts/perf/dags'
+DAG_IDS = ['perf_dag_1', 'perf_dag_2']
+MAX_RUNTIME_SECS = 6
+
+
+class SchedulerMetricsJob(SchedulerJob):
+    """
+    This class extends SchedulerJob to instrument the execution performance of
+    task instances contained in each DAG. We want to know if any DAG
+    is starved of resources, and this will be reflected in the stats printed
+    out at the end of the test run. The following metrics will be instrumented
+    for each task instance (dag_id, task_id, execution_date) tuple:
+
+    1. Queuing delay - time taken from starting the executor to the task
+       instance to be added to the executor queue.
+    2. Start delay - time taken from starting the executor to the task instance
+       to start execution.
+    3. Land time - time taken from starting the executor to task instance
+       completion.
+    4. Duration - time taken for executing the task instance.
+
+    The DAGs implement bash operators that call the system wait command. This
+    is representative of typical operators run on Airflow - queries that are
+    run on remote systems and spend the majority of their time on I/O wait.
+
+    To Run:
+        $ python scripts/perf/scheduler_ops_metrics.py
+    """
+    __mapper_args__ = {
+        'polymorphic_identity': 'SchedulerMetricsJob'
+    }
+
+    def print_stats(self):
+        """
+        Print operational metrics for the scheduler test.
+        """
+        session = settings.Session()
+        TI = TaskInstance
+        tis = (
+            session
+            .query(TI)
+            .filter(TI.dag_id.in_(DAG_IDS))
+            .all()
+        )
+        successful_tis = filter(lambda x: x.state == State.SUCCESS, tis)
+        ti_perf = [(ti.dag_id, ti.task_id, ti.execution_date,
+                    (ti.queued_dttm - self.start_date).total_seconds(),
+                    (ti.start_date - self.start_date).total_seconds(),
+                    (ti.end_date - self.start_date).total_seconds(),
+                    ti.duration) for ti in successful_tis]
+        ti_perf_df = pd.DataFrame(ti_perf, columns=['dag_id', 'task_id',
+                                                    'execution_date',
+                                                    'queue_delay',
+                                                    'start_delay', 'land_time',
+                                                    'duration'])
+
+        print('Performance Results')
+        print('###################')
+        for dag_id in DAG_IDS:
+            print('DAG {}'.format(dag_id))
+            print(ti_perf_df[ti_perf_df['dag_id'] == dag_id])
+        print('###################')
+        if len(tis) > len(successful_tis):
+            print("WARNING!! The following task instances haven't completed")
+            print(pd.DataFrame([(ti.dag_id, ti.task_id, ti.execution_date, 
ti.state)
+                  for ti in filter(lambda x: x.state != State.SUCCESS, tis)],
+                  columns=['dag_id', 'task_id', 'execution_date', 'state']))
+
+        session.commit()
+
+    def heartbeat(self):
+        """
+        Override the scheduler heartbeat to determine when the test is complete
+        """
+        super(SchedulerMetricsJob, self).heartbeat()
+        session = settings.Session()
+        # Get all the relevant task instances
+        TI = TaskInstance
+        successful_tis = (
+            session
+            .query(TI)
+            .filter(TI.dag_id.in_(DAG_IDS))
+            .filter(TI.state.in_([State.SUCCESS]))
+            .all()
+        )
+        session.commit()
+
+        dagbag = DagBag(SUBDIR)
+        dags = [dagbag.dags[dag_id] for dag_id in DAG_IDS]
+        # the tasks in perf_dag_1 and per_dag_2 have a daily schedule interval.
+        num_task_instances = sum([(datetime.today() - task.start_date).days
+                                 for dag in dags for task in dag.tasks])
+
+        if (len(successful_tis) == num_task_instances or
+                (datetime.now()-self.start_date).total_seconds() >
+                MAX_RUNTIME_SECS):
+            if (len(successful_tis) == num_task_instances):
+                self.logger.info("All tasks processed! Printing stats.")
+            else:
+                self.logger.info("Test timeout reached. "
+                                 "Printing available stats.")
+            self.print_stats()
+            set_dags_paused_state(True)
+            sys.exit()
+
+
+def clear_dag_runs():
+    """
+    Remove any existing DAG runs for the perf test DAGs.
+    """
+    session = settings.Session()
+    drs = session.query(DagRun).filter(
+        DagRun.dag_id.in_(DAG_IDS),
+    ).all()
+    for dr in drs:
+        logging.info('Deleting DagRun :: {}'.format(dr))
+        session.delete(dr)
+
+
+def clear_dag_task_instances():
+    """
+    Remove any existing task instances for the perf test DAGs.
+    """
+    session = settings.Session()
+    TI = TaskInstance
+    tis = (
+        session
+        .query(TI)
+        .filter(TI.dag_id.in_(DAG_IDS))
+        .all()
+    )
+    for ti in tis:
+        logging.info('Deleting TaskInstance :: {}'.format(ti))
+        session.delete(ti)
+    session.commit()
+
+
+def set_dags_paused_state(is_paused):
+    """
+    Toggle the pause state of the DAGs in the test.
+    """
+    session = settings.Session()
+    dms = session.query(DagModel).filter(
+        DagModel.dag_id.in_(DAG_IDS))
+    for dm in dms:
+        logging.info('Setting DAG :: {} is_paused={}'.format(dm, is_paused))
+        dm.is_paused = is_paused
+    session.commit()
+
+
+def main():
+    configuration.load_test_config()
+
+    set_dags_paused_state(False)
+    clear_dag_runs()
+    clear_dag_task_instances()
+
+    job = SchedulerMetricsJob(dag_ids=DAG_IDS, subdir=SUBDIR)
+    job.run()
+
+
+if __name__ == "__main__":
+    main()

Reply via email to