This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-1-test by this push:
     new 19f32fd  Improve dag/task concurrency check (#17786)
19f32fd is described below

commit 19f32fd662a9d60a1271c92c2bca28313a500697
Author: Ephraim Anierobi <splendidzig...@gmail.com>
AuthorDate: Mon Aug 23 15:58:55 2021 +0100

    Improve dag/task concurrency check (#17786)
    
    Currently, tasks can be run even if the dagrun is queued. Task instances of 
queued dagruns
    should only be run when the dagrun is in running state. This PR makes sure 
tis of queued dagruns
    are not run thereby properly checking task concurrency.
    
    Also, we check max_active_runs when parsing dag which is no longer needed 
since dagruns
    are created in queued state and the scheduler controls when to change the 
queued dagruns
    to running considering the max_active_runs.
    This PR removes the checking of max_active_runs in the dag too.
    
    (cherry picked from commit ffb81eae610f738fd45c88cdb27d601c0edf24fa)
---
 airflow/jobs/scheduler_job.py    |  5 ++---
 airflow/models/dag.py            | 26 +-------------------------
 tests/jobs/test_scheduler_job.py | 31 +++++++++++++++++++++++++++++++
 tests/models/test_dag.py         | 14 +++++++-------
 4 files changed, 41 insertions(+), 35 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 18ec981..8a625b9 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -323,6 +323,7 @@ class SchedulerJob(BaseJob):
             session.query(TI)
             .outerjoin(TI.dag_run)
             .filter(or_(DR.run_id.is_(None), DR.run_type != 
DagRunType.BACKFILL_JOB))
+            .filter(or_(DR.state.is_(None), DR.state != DagRunState.QUEUED))
             .join(TI.dag_model)
             .filter(not_(DM.is_paused))
             .filter(TI.state == State.SCHEDULED)
@@ -1011,9 +1012,7 @@ class SchedulerJob(BaseJob):
                     dag_hash=dag_hash,
                     creating_job_id=self.id,
                 )
-            dag_model.next_dagrun, dag_model.next_dagrun_create_after = 
dag.next_dagrun_info(
-                dag_model.next_dagrun
-            )
+            dag_model.calculate_dagrun_date_fields(dag, dag_model.next_dagrun)
 
         # TODO[HA]: Should we do a session.flush() so we don't have to keep 
lots of state/object in
         # memory for larger dags? or expunge_all()
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 2e66b40..74f985f 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1867,18 +1867,6 @@ class DAG(LoggingMixin):
             .all()
         )
 
-        # Get number of active dagruns for all dags we are processing as a 
single query.
-        num_active_runs = dict(
-            session.query(DagRun.dag_id, func.count('*'))
-            .filter(
-                DagRun.dag_id.in_(existing_dag_ids),
-                DagRun.state == State.RUNNING,
-                DagRun.external_trigger.is_(False),
-            )
-            .group_by(DagRun.dag_id)
-            .all()
-        )
-
         for orm_dag in sorted(orm_dags, key=lambda d: d.dag_id):
             dag = dag_by_ids[orm_dag.dag_id]
             if dag.is_subdag:
@@ -1901,7 +1889,6 @@ class DAG(LoggingMixin):
             orm_dag.calculate_dagrun_date_fields(
                 dag,
                 most_recent_dag_runs.get(dag.dag_id),
-                num_active_runs.get(dag.dag_id, 0),
             )
 
             for orm_tag in list(orm_dag.tags):
@@ -2282,27 +2269,16 @@ class DagModel(Base):
         return with_row_locks(query, of=cls, session=session, 
**skip_locked(session=session))
 
     def calculate_dagrun_date_fields(
-        self, dag: DAG, most_recent_dag_run: Optional[pendulum.DateTime], 
active_runs_of_dag: int
+        self, dag: DAG, most_recent_dag_run: Optional[pendulum.DateTime]
     ) -> None:
         """
         Calculate ``next_dagrun`` and `next_dagrun_create_after``
 
         :param dag: The DAG object
         :param most_recent_dag_run: DateTime of most recent run of this dag, 
or none if not yet scheduled.
-        :param active_runs_of_dag: Number of currently active runs of this dag
         """
         self.next_dagrun, self.next_dagrun_create_after = 
dag.next_dagrun_info(most_recent_dag_run)
 
-        if dag.max_active_runs and active_runs_of_dag >= dag.max_active_runs:
-            # Since this happens every time the dag is parsed it would be 
quite spammy at info
-            log.debug(
-                "DAG %s is at (or above) max_active_runs (%d of %d), not 
creating any more runs",
-                dag.dag_id,
-                active_runs_of_dag,
-                dag.max_active_runs,
-            )
-            self.next_dagrun_create_after = None
-
         log.info("Setting next_dagrun for %s to %s", dag.dag_id, 
self.next_dagrun)
 
 
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 5de365e..f02ecfa 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -799,6 +799,37 @@ class TestSchedulerJob(unittest.TestCase):
         assert 0 == 
len(self.scheduler_job._executable_task_instances_to_queued(max_tis=32, 
session=session))
         session.rollback()
 
+    def test_tis_for_queued_dagruns_are_not_run(self, dag_maker):
+        """
+        This tests that tis from queued dagruns are not queued
+        """
+        dag_id = "test_tis_for_queued_dagruns_are_not_run"
+        task_id_1 = 'dummy'
+
+        with dag_maker(dag_id) as dag:
+            task1 = DummyOperator(task_id=task_id_1)
+        dr1 = dag_maker.create_dagrun(state=State.QUEUED)
+        dr2 = dag_maker.create_dagrun(
+            run_id='test2', 
execution_date=dag.following_schedule(dr1.execution_date)
+        )
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        session = settings.Session()
+        ti1 = TaskInstance(task1, dr1.execution_date)
+        ti2 = TaskInstance(task1, dr2.execution_date)
+        ti1.state = State.SCHEDULED
+        ti2.state = State.SCHEDULED
+        session.merge(ti1)
+        session.merge(ti2)
+        session.flush()
+        res = 
self.scheduler_job._executable_task_instances_to_queued(max_tis=32, 
session=session)
+
+        assert 1 == len(res)
+        assert ti2.key == res[0].key
+        ti1.refresh_from_db()
+        ti2.refresh_from_db()
+        assert ti1.state == State.SCHEDULED
+        assert ti2.state == State.QUEUED
+
     def test_find_executable_task_instances_concurrency(self):
         dag_id = 
'SchedulerJobTest.test_find_executable_task_instances_concurrency'
         task_id_1 = 'dummy'
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 34d761d..118dec4 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -674,7 +674,7 @@ class TestDag(unittest.TestCase):
         clear_db_dags()
         dags = [DAG(f'dag-bulk-sync-{i}', start_date=DEFAULT_DATE, 
tags=["test-dag"]) for i in range(0, 4)]
 
-        with assert_queries_count(5):
+        with assert_queries_count(4):
             DAG.bulk_write_to_db(dags)
         with create_session() as session:
             assert {'dag-bulk-sync-0', 'dag-bulk-sync-1', 'dag-bulk-sync-2', 
'dag-bulk-sync-3'} == {
@@ -691,14 +691,14 @@ class TestDag(unittest.TestCase):
                 assert row[0] is not None
 
         # Re-sync should do fewer queries
-        with assert_queries_count(4):
+        with assert_queries_count(3):
             DAG.bulk_write_to_db(dags)
-        with assert_queries_count(4):
+        with assert_queries_count(3):
             DAG.bulk_write_to_db(dags)
         # Adding tags
         for dag in dags:
             dag.tags.append("test-dag2")
-        with assert_queries_count(5):
+        with assert_queries_count(4):
             DAG.bulk_write_to_db(dags)
         with create_session() as session:
             assert {'dag-bulk-sync-0', 'dag-bulk-sync-1', 'dag-bulk-sync-2', 
'dag-bulk-sync-3'} == {
@@ -717,7 +717,7 @@ class TestDag(unittest.TestCase):
         # Removing tags
         for dag in dags:
             dag.tags.remove("test-dag")
-        with assert_queries_count(5):
+        with assert_queries_count(4):
             DAG.bulk_write_to_db(dags)
         with create_session() as session:
             assert {'dag-bulk-sync-0', 'dag-bulk-sync-1', 'dag-bulk-sync-2', 
'dag-bulk-sync-3'} == {
@@ -764,8 +764,8 @@ class TestDag(unittest.TestCase):
 
         model = session.query(DagModel).get((dag.dag_id,))
         assert model.next_dagrun == period_end
-        # We signal "at max active runs" by saying this run is never eligible 
to be created
-        assert model.next_dagrun_create_after is None
+        # Next dagrun after is not None because the dagrun would be in queued 
state
+        assert model.next_dagrun_create_after is not None
 
     def test_sync_to_db(self):
         dag = DAG(

Reply via email to