[GitHub] [airflow] zhongjiajie commented on a change in pull request #7157: [AIRFLOW-6251] add config for max tasks per dag
zhongjiajie commented on a change in pull request #7157: [AIRFLOW-6251] add config for max tasks per dag URL: https://github.com/apache/airflow/pull/7157#discussion_r31407 ## File path: tests/models/test_dagbag.py ## @@ -151,6 +151,42 @@ def test_zip(self): dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")) self.assertTrue(dagbag.get_dag("test_zip_dag")) +@conf_vars({('core', 'max_tasks_per_dag'): '5'}) +def test_process_file_max_task_check(self): +""" +test if num_tasks > max_tasks_per_dag can be identified +""" +a_dag_id = "example_short_circuit_operator" Review comment: ```suggestion a_dag_id = "test_example_bash_operator" ``` I take a quick look. `TEST_DAGS_FOLDER` point to `airflow/tests/models/../dags` and this path not including dag `example_short_circuit_operator`. So you could not process_file `airflow/tests/models/../dags/example_short_circuit_operator.py` Maybe you should use others dag in `airflow/tests/models/../dags`(which `TEST_DAGS_FOLDER` point to), just like `test_example_bash_operator` and change `@conf_vars({('core', 'max_tasks_per_dag'): '5'})` to `@conf_vars({('core', 'max_tasks_per_dag'): '7'})`. So as others test @tooptoop4 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] zhongjiajie commented on a change in pull request #7157: [AIRFLOW-6251] add config for max tasks per dag
zhongjiajie commented on a change in pull request #7157: [AIRFLOW-6251] add config for max tasks per dag URL: https://github.com/apache/airflow/pull/7157#discussion_r31407 ## File path: tests/models/test_dagbag.py ## @@ -151,6 +151,42 @@ def test_zip(self): dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")) self.assertTrue(dagbag.get_dag("test_zip_dag")) +@conf_vars({('core', 'max_tasks_per_dag'): '5'}) +def test_process_file_max_task_check(self): +""" +test if num_tasks > max_tasks_per_dag can be identified +""" +a_dag_id = "example_short_circuit_operator" Review comment: ```suggestion a_dag_id = "test_example_bash_operator" ``` I take a quick look. `TEST_DAGS_FOLDER` point to `airflow/tests/models/../dags` and this path not including dag `example_short_circuit_operator`. So you could not process_file `airflow/tests/models/../dags/example_short_circuit_operator.py` Maybe you should use others dag in `airflow/tests/models/../dags`(which `TEST_DAGS_FOLDER` point to), just like `test_example_bash_operator` and change `@conf_vars({('core', 'max_tasks_per_dag'): '5'})` to `@conf_vars({('core', 'max_tasks_per_dag'): '7'})`. So as others test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] zhongjiajie commented on a change in pull request #7157: [AIRFLOW-6251] add config for max tasks per dag
zhongjiajie commented on a change in pull request #7157: [AIRFLOW-6251] add config for max tasks per dag URL: https://github.com/apache/airflow/pull/7157#discussion_r30224 ## File path: airflow/models/dagbag.py ## @@ -300,26 +302,37 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): dag.full_filepath = filepath if dag.fileloc != filepath and not is_zipfile: dag.fileloc = filepath -try: -dag.is_subdag = False -self.bag_dag(dag, parent_dag=dag, root_dag=dag) -if isinstance(dag._schedule_interval, str): -croniter(dag._schedule_interval) -found_dags.append(dag) -found_dags += dag.subdags -except (CroniterBadCronError, -CroniterBadDateError, -CroniterNotAlphaError) as cron_e: -self.log.exception("Failed to bag_dag: %s", dag.full_filepath) + +num_tasks = len(dag.tasks) +if max_tasks_per_dag > 0 and num_tasks > max_tasks_per_dag: Review comment: But code base already have some example and IDE like pycharm will give a hint change to `var1 < var2 < var3` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] zhongjiajie commented on a change in pull request #7157: [AIRFLOW-6251] add config for max tasks per dag
zhongjiajie commented on a change in pull request #7157: [AIRFLOW-6251] add config for max tasks per dag URL: https://github.com/apache/airflow/pull/7157#discussion_r366360841 ## File path: airflow/models/dagbag.py ## @@ -300,26 +302,37 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): dag.full_filepath = filepath if dag.fileloc != filepath and not is_zipfile: dag.fileloc = filepath -try: -dag.is_subdag = False -self.bag_dag(dag, parent_dag=dag, root_dag=dag) -if isinstance(dag._schedule_interval, str): -croniter(dag._schedule_interval) -found_dags.append(dag) -found_dags += dag.subdags -except (CroniterBadCronError, -CroniterBadDateError, -CroniterNotAlphaError) as cron_e: -self.log.exception("Failed to bag_dag: %s", dag.full_filepath) + +num_tasks = len(dag.tasks) +if max_tasks_per_dag > 0 and num_tasks > max_tasks_per_dag: Review comment: Really? but I remember we have a PR to do that, I will take a look This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] zhongjiajie commented on a change in pull request #7157: [AIRFLOW-6251] add config for max tasks per dag
zhongjiajie commented on a change in pull request #7157: [AIRFLOW-6251] add config for max tasks per dag URL: https://github.com/apache/airflow/pull/7157#discussion_r366363132 ## File path: airflow/models/dagbag.py ## @@ -300,26 +302,37 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): dag.full_filepath = filepath if dag.fileloc != filepath and not is_zipfile: dag.fileloc = filepath -try: -dag.is_subdag = False -self.bag_dag(dag, parent_dag=dag, root_dag=dag) -if isinstance(dag._schedule_interval, str): -croniter(dag._schedule_interval) -found_dags.append(dag) -found_dags += dag.subdags -except (CroniterBadCronError, -CroniterBadDateError, -CroniterNotAlphaError) as cron_e: -self.log.exception("Failed to bag_dag: %s", dag.full_filepath) + +num_tasks = len(dag.tasks) +if max_tasks_per_dag > 0 and num_tasks > max_tasks_per_dag: Review comment: @ashb After some quick look, in https://github.com/apache/airflow/blob/4a344f13d26ecbb627bb9968895b290bfd86e4da/airflow/cli/commands/webserver_command.py#L90 And it work in ```py In [1]: x = 0 In [2]: num_task = 10 In [3]: 0 < x < num_task Out[3]: False In [4]: 0 <= x < num_task Out[4]: True ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] zhongjiajie commented on a change in pull request #7157: [AIRFLOW-6251] add config for max tasks per dag
zhongjiajie commented on a change in pull request #7157: [AIRFLOW-6251] add config for max tasks per dag URL: https://github.com/apache/airflow/pull/7157#discussion_r366363132 ## File path: airflow/models/dagbag.py ## @@ -300,26 +302,37 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): dag.full_filepath = filepath if dag.fileloc != filepath and not is_zipfile: dag.fileloc = filepath -try: -dag.is_subdag = False -self.bag_dag(dag, parent_dag=dag, root_dag=dag) -if isinstance(dag._schedule_interval, str): -croniter(dag._schedule_interval) -found_dags.append(dag) -found_dags += dag.subdags -except (CroniterBadCronError, -CroniterBadDateError, -CroniterNotAlphaError) as cron_e: -self.log.exception("Failed to bag_dag: %s", dag.full_filepath) + +num_tasks = len(dag.tasks) +if max_tasks_per_dag > 0 and num_tasks > max_tasks_per_dag: Review comment: After some quick look, in https://github.com/apache/airflow/blob/4a344f13d26ecbb627bb9968895b290bfd86e4da/airflow/cli/commands/webserver_command.py#L90 And it work in ```py In [1]: x = 0 In [2]: num_task = 10 In [3]: 0 < x < num_task Out[3]: False In [4]: 0 <= x < num_task Out[4]: True ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] zhongjiajie commented on a change in pull request #7157: [AIRFLOW-6251] add config for max tasks per dag
zhongjiajie commented on a change in pull request #7157: [AIRFLOW-6251] add config for max tasks per dag URL: https://github.com/apache/airflow/pull/7157#discussion_r366360841 ## File path: airflow/models/dagbag.py ## @@ -300,26 +302,37 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): dag.full_filepath = filepath if dag.fileloc != filepath and not is_zipfile: dag.fileloc = filepath -try: -dag.is_subdag = False -self.bag_dag(dag, parent_dag=dag, root_dag=dag) -if isinstance(dag._schedule_interval, str): -croniter(dag._schedule_interval) -found_dags.append(dag) -found_dags += dag.subdags -except (CroniterBadCronError, -CroniterBadDateError, -CroniterNotAlphaError) as cron_e: -self.log.exception("Failed to bag_dag: %s", dag.full_filepath) + +num_tasks = len(dag.tasks) +if max_tasks_per_dag > 0 and num_tasks > max_tasks_per_dag: Review comment: Really? but I remember we have a PR to do that, I will task a look This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] zhongjiajie commented on a change in pull request #7157: [AIRFLOW-6251] add config for max tasks per dag
zhongjiajie commented on a change in pull request #7157: [AIRFLOW-6251] add config for max tasks per dag URL: https://github.com/apache/airflow/pull/7157#discussion_r366241733 ## File path: airflow/models/dagbag.py ## @@ -300,26 +302,37 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): dag.full_filepath = filepath if dag.fileloc != filepath and not is_zipfile: dag.fileloc = filepath -try: -dag.is_subdag = False -self.bag_dag(dag, parent_dag=dag, root_dag=dag) -if isinstance(dag._schedule_interval, str): -croniter(dag._schedule_interval) -found_dags.append(dag) -found_dags += dag.subdags -except (CroniterBadCronError, -CroniterBadDateError, -CroniterNotAlphaError) as cron_e: -self.log.exception("Failed to bag_dag: %s", dag.full_filepath) + +num_tasks = len(dag.tasks) +if max_tasks_per_dag > 0 and num_tasks > max_tasks_per_dag: Review comment: ```suggestion if 0 < max_tasks_per_dag < num_tasks: ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services