This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 3ca09b92e0fa1f8e961ab634baedfc15296899c5 Author: Kamil BreguĊa <mik-...@users.noreply.github.com> AuthorDate: Mon Feb 24 16:15:38 2020 +0100 [AIRFLOW-6897] Simplify DagFileProcessorManager (#7521) (cherry picked from commit 83d826b9925ce0eb2bd1fe403f5151fbef310b63) --- airflow/utils/dag_processing.py | 129 ++++++++++++++++++++-------------------- 1 file changed, 63 insertions(+), 66 deletions(-) diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index 3aac8fd..e4ecc29 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -763,7 +763,7 @@ class DagFileProcessorManager(LoggingMixin): # Map from file path to the processor self._processors = {} - self._heartbeat_count = 0 + self._num_run = 0 # Map from file path to stats about the file self._file_stats = {} # type: dict(str, DagFileStat) @@ -843,11 +843,24 @@ class DagFileProcessorManager(LoggingMixin): # are told to (as that would open another connection to the # SQLite DB which isn't a good practice continue - + # pylint: enable=no-else-break self._refresh_dag_dir() - self._find_zombies() + self._find_zombies() # pylint: disable=no-value-for-parameter + + self._kill_timed_out_processors() + simple_dags = self.collect_results() + + # Generate more file paths to process if we processed all the files + # already. + if not self._file_path_queue: + self.emit_metrics() + self.prepare_file_path_queue() + + self.start_new_processes() + + # Update number of loop iteration. + self._num_run += 1 - simple_dags = self.heartbeat() for simple_dag in simple_dags: self._signal_conn.send(simple_dag) @@ -1197,65 +1210,11 @@ class DagFileProcessorManager(LoggingMixin): return simple_dags - def heartbeat(self): - """ - This should be periodically called by the manager loop. This method will - kick off new processes to process DAG definition files and read the - results from the finished processors. - - :return: a list of SimpleDags that were produced by processors that - have finished since the last time this was called - :rtype: list[airflow.utils.dag_processing.SimpleDag] + def start_new_processes(self): + """" + Start more processors if we have enough slots and files to process """ - simple_dags = self.collect_results() - - # Generate more file paths to process if we processed all the files - # already. - if len(self._file_path_queue) == 0: - self.emit_metrics() - - self._parsing_start_time = timezone.utcnow() - # If the file path is already being processed, or if a file was - # processed recently, wait until the next batch - file_paths_in_progress = self._processors.keys() - now = timezone.utcnow() - file_paths_recently_processed = [] - for file_path in self._file_paths: - last_finish_time = self.get_last_finish_time(file_path) - if (last_finish_time is not None and - (now - last_finish_time).total_seconds() < - self._file_process_interval): - file_paths_recently_processed.append(file_path) - - files_paths_at_run_limit = [file_path - for file_path, stat in self._file_stats.items() - if stat.run_count == self._max_runs] - - files_paths_to_queue = list(set(self._file_paths) - - set(file_paths_in_progress) - - set(file_paths_recently_processed) - - set(files_paths_at_run_limit)) - - for file_path, processor in self._processors.items(): - self.log.debug( - "File path %s is still being processed (started: %s)", - processor.file_path, processor.start_time.isoformat() - ) - - self.log.debug( - "Queuing the following files for processing:\n\t%s", - "\n\t".join(files_paths_to_queue) - ) - - for file_path in files_paths_to_queue: - if file_path not in self._file_stats: - self._file_stats[file_path] = DagFileStat(0, 0, None, None, 0) - - self._file_path_queue.extend(files_paths_to_queue) - - # Start more processors if we have enough slots and files to process - while (self._parallelism - len(self._processors) > 0 and - len(self._file_path_queue) > 0): + while self._parallelism - len(self._processors) > 0 and self._file_path_queue: file_path = self._file_path_queue.pop(0) processor = self._processor_factory(file_path, self._zombies) Stats.incr('dag_processing.processes') @@ -1267,10 +1226,48 @@ class DagFileProcessorManager(LoggingMixin): ) self._processors[file_path] = processor - # Update heartbeat count. - self._heartbeat_count += 1 + def prepare_file_path_queue(self): + """ + Generate more file paths to process. Result are saved in _file_path_queue. + """ + self._parsing_start_time = timezone.utcnow() + # If the file path is already being processed, or if a file was + # processed recently, wait until the next batch + file_paths_in_progress = self._processors.keys() + now = timezone.utcnow() + file_paths_recently_processed = [] + for file_path in self._file_paths: + last_finish_time = self.get_last_finish_time(file_path) + if (last_finish_time is not None and + (now - last_finish_time).total_seconds() < + self._file_process_interval): + file_paths_recently_processed.append(file_path) + + files_paths_at_run_limit = [file_path + for file_path, stat in self._file_stats.items() + if stat.run_count == self._max_runs] + + files_paths_to_queue = list(set(self._file_paths) - + set(file_paths_in_progress) - + set(file_paths_recently_processed) - + set(files_paths_at_run_limit)) - return simple_dags + for file_path, processor in self._processors.items(): + self.log.debug( + "File path %s is still being processed (started: %s)", + processor.file_path, processor.start_time.isoformat() + ) + + self.log.debug( + "Queuing the following files for processing:\n\t%s", + "\n\t".join(files_paths_to_queue) + ) + + for file_path in files_paths_to_queue: + if file_path not in self._file_stats: + self._file_stats[file_path] = DagFileStat(0, 0, None, None, 0) + + self._file_path_queue.extend(files_paths_to_queue) @provide_session def _find_zombies(self, session): @@ -1338,7 +1335,7 @@ class DagFileProcessorManager(LoggingMixin): for stat in self._file_stats.values(): if stat.run_count < self._max_runs: return False - if self._heartbeat_count < self._max_runs: + if self._num_run < self._max_runs: return False return True