This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 3ca09b92e0fa1f8e961ab634baedfc15296899c5
Author: Kamil BreguĊ‚a <mik-...@users.noreply.github.com>
AuthorDate: Mon Feb 24 16:15:38 2020 +0100

    [AIRFLOW-6897] Simplify DagFileProcessorManager (#7521)
    
    (cherry picked from commit 83d826b9925ce0eb2bd1fe403f5151fbef310b63)
---
 airflow/utils/dag_processing.py | 129 ++++++++++++++++++++--------------------
 1 file changed, 63 insertions(+), 66 deletions(-)

diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 3aac8fd..e4ecc29 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -763,7 +763,7 @@ class DagFileProcessorManager(LoggingMixin):
         # Map from file path to the processor
         self._processors = {}
 
-        self._heartbeat_count = 0
+        self._num_run = 0
 
         # Map from file path to stats about the file
         self._file_stats = {}  # type: dict(str, DagFileStat)
@@ -843,11 +843,24 @@ class DagFileProcessorManager(LoggingMixin):
                 # are told to (as that would open another connection to the
                 # SQLite DB which isn't a good practice
                 continue
-
+            # pylint: enable=no-else-break
             self._refresh_dag_dir()
-            self._find_zombies()
+            self._find_zombies()  # pylint: disable=no-value-for-parameter
+
+            self._kill_timed_out_processors()
+            simple_dags = self.collect_results()
+
+            # Generate more file paths to process if we processed all the files
+            # already.
+            if not self._file_path_queue:
+                self.emit_metrics()
+                self.prepare_file_path_queue()
+
+            self.start_new_processes()
+
+            # Update number of loop iteration.
+            self._num_run += 1
 
-            simple_dags = self.heartbeat()
             for simple_dag in simple_dags:
                 self._signal_conn.send(simple_dag)
 
@@ -1197,65 +1210,11 @@ class DagFileProcessorManager(LoggingMixin):
 
         return simple_dags
 
-    def heartbeat(self):
-        """
-        This should be periodically called by the manager loop. This method 
will
-        kick off new processes to process DAG definition files and read the
-        results from the finished processors.
-
-        :return: a list of SimpleDags that were produced by processors that
-            have finished since the last time this was called
-        :rtype: list[airflow.utils.dag_processing.SimpleDag]
+    def start_new_processes(self):
+        """"
+        Start more processors if we have enough slots and files to process
         """
-        simple_dags = self.collect_results()
-
-        # Generate more file paths to process if we processed all the files
-        # already.
-        if len(self._file_path_queue) == 0:
-            self.emit_metrics()
-
-            self._parsing_start_time = timezone.utcnow()
-            # If the file path is already being processed, or if a file was
-            # processed recently, wait until the next batch
-            file_paths_in_progress = self._processors.keys()
-            now = timezone.utcnow()
-            file_paths_recently_processed = []
-            for file_path in self._file_paths:
-                last_finish_time = self.get_last_finish_time(file_path)
-                if (last_finish_time is not None and
-                    (now - last_finish_time).total_seconds() <
-                        self._file_process_interval):
-                    file_paths_recently_processed.append(file_path)
-
-            files_paths_at_run_limit = [file_path
-                                        for file_path, stat in 
self._file_stats.items()
-                                        if stat.run_count == self._max_runs]
-
-            files_paths_to_queue = list(set(self._file_paths) -
-                                        set(file_paths_in_progress) -
-                                        set(file_paths_recently_processed) -
-                                        set(files_paths_at_run_limit))
-
-            for file_path, processor in self._processors.items():
-                self.log.debug(
-                    "File path %s is still being processed (started: %s)",
-                    processor.file_path, processor.start_time.isoformat()
-                )
-
-            self.log.debug(
-                "Queuing the following files for processing:\n\t%s",
-                "\n\t".join(files_paths_to_queue)
-            )
-
-            for file_path in files_paths_to_queue:
-                if file_path not in self._file_stats:
-                    self._file_stats[file_path] = DagFileStat(0, 0, None, 
None, 0)
-
-            self._file_path_queue.extend(files_paths_to_queue)
-
-        # Start more processors if we have enough slots and files to process
-        while (self._parallelism - len(self._processors) > 0 and
-               len(self._file_path_queue) > 0):
+        while self._parallelism - len(self._processors) > 0 and 
self._file_path_queue:
             file_path = self._file_path_queue.pop(0)
             processor = self._processor_factory(file_path, self._zombies)
             Stats.incr('dag_processing.processes')
@@ -1267,10 +1226,48 @@ class DagFileProcessorManager(LoggingMixin):
             )
             self._processors[file_path] = processor
 
-        # Update heartbeat count.
-        self._heartbeat_count += 1
+    def prepare_file_path_queue(self):
+        """
+        Generate more file paths to process. Result are saved in 
_file_path_queue.
+        """
+        self._parsing_start_time = timezone.utcnow()
+        # If the file path is already being processed, or if a file was
+        # processed recently, wait until the next batch
+        file_paths_in_progress = self._processors.keys()
+        now = timezone.utcnow()
+        file_paths_recently_processed = []
+        for file_path in self._file_paths:
+            last_finish_time = self.get_last_finish_time(file_path)
+            if (last_finish_time is not None and
+                (now - last_finish_time).total_seconds() <
+                    self._file_process_interval):
+                file_paths_recently_processed.append(file_path)
+
+        files_paths_at_run_limit = [file_path
+                                    for file_path, stat in 
self._file_stats.items()
+                                    if stat.run_count == self._max_runs]
+
+        files_paths_to_queue = list(set(self._file_paths) -
+                                    set(file_paths_in_progress) -
+                                    set(file_paths_recently_processed) -
+                                    set(files_paths_at_run_limit))
 
-        return simple_dags
+        for file_path, processor in self._processors.items():
+            self.log.debug(
+                "File path %s is still being processed (started: %s)",
+                processor.file_path, processor.start_time.isoformat()
+            )
+
+        self.log.debug(
+            "Queuing the following files for processing:\n\t%s",
+            "\n\t".join(files_paths_to_queue)
+        )
+
+        for file_path in files_paths_to_queue:
+            if file_path not in self._file_stats:
+                self._file_stats[file_path] = DagFileStat(0, 0, None, None, 0)
+
+        self._file_path_queue.extend(files_paths_to_queue)
 
     @provide_session
     def _find_zombies(self, session):
@@ -1338,7 +1335,7 @@ class DagFileProcessorManager(LoggingMixin):
         for stat in self._file_stats.values():
             if stat.run_count < self._max_runs:
                 return False
-        if self._heartbeat_count < self._max_runs:
+        if self._num_run < self._max_runs:
             return False
         return True
 

Reply via email to