[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r367161194 ## File path: airflow/jobs/scheduler_job.py ## @@ -1306,33 +1271,32 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, :param simple_dag_bag: Should contains all of the task_instances' dags :type simple_dag_bag: airflow.utils.dag_processing.SimpleDagBag """ -TI = models.TaskInstance # actually enqueue them -for simple_task_instance in simple_task_instances: -simple_dag = simple_dag_bag.get_dag(simple_task_instance.dag_id) -command = TI.generate_command( -simple_task_instance.dag_id, -simple_task_instance.task_id, -simple_task_instance.execution_date, +for ti in task_instances: +simple_dag = simple_dag_bag.get_dag(ti.dag_id) +command = ti.generate_command( +ti.dag_id, +ti.task_id, +ti.execution_date, local=True, mark_success=False, ignore_all_deps=False, ignore_depends_on_past=False, ignore_task_deps=False, ignore_ti_state=False, -pool=simple_task_instance.pool, +pool=ti.pool, file_path=simple_dag.full_filepath, pickle_id=simple_dag.pickle_id) -priority = simple_task_instance.priority_weight -queue = simple_task_instance.queue +priority = ti.priority_weight +queue = ti.queue self.log.info( "Sending %s to executor with priority %s and queue %s", -simple_task_instance.key, priority, queue +ti.key, priority, queue ) self.executor.queue_command( Review comment: I've put a TODO comment in here. Not happy about doing that but I think this PR is big enough already. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r367156221 ## File path: airflow/jobs/scheduler_job.py ## @@ -1306,33 +1271,32 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, :param simple_dag_bag: Should contains all of the task_instances' dags :type simple_dag_bag: airflow.utils.dag_processing.SimpleDagBag """ -TI = models.TaskInstance # actually enqueue them -for simple_task_instance in simple_task_instances: -simple_dag = simple_dag_bag.get_dag(simple_task_instance.dag_id) -command = TI.generate_command( -simple_task_instance.dag_id, -simple_task_instance.task_id, -simple_task_instance.execution_date, +for ti in task_instances: +simple_dag = simple_dag_bag.get_dag(ti.dag_id) +command = ti.generate_command( +ti.dag_id, +ti.task_id, +ti.execution_date, local=True, mark_success=False, ignore_all_deps=False, ignore_depends_on_past=False, ignore_task_deps=False, ignore_ti_state=False, -pool=simple_task_instance.pool, +pool=ti.pool, file_path=simple_dag.full_filepath, pickle_id=simple_dag.pickle_id) -priority = simple_task_instance.priority_weight -queue = simple_task_instance.queue +priority = ti.priority_weight +queue = ti.queue self.log.info( "Sending %s to executor with priority %s and queue %s", -simple_task_instance.key, priority, queue +ti.key, priority, queue ) self.executor.queue_command( Review comment: Fixing this properly will also fix https://issues.apache.org/jira/browse/AIRFLOW-3877 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r367154648 ## File path: airflow/jobs/scheduler_job.py ## @@ -1306,33 +1271,32 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, :param simple_dag_bag: Should contains all of the task_instances' dags :type simple_dag_bag: airflow.utils.dag_processing.SimpleDagBag """ -TI = models.TaskInstance # actually enqueue them -for simple_task_instance in simple_task_instances: -simple_dag = simple_dag_bag.get_dag(simple_task_instance.dag_id) -command = TI.generate_command( -simple_task_instance.dag_id, -simple_task_instance.task_id, -simple_task_instance.execution_date, +for ti in task_instances: +simple_dag = simple_dag_bag.get_dag(ti.dag_id) +command = ti.generate_command( +ti.dag_id, +ti.task_id, +ti.execution_date, local=True, mark_success=False, ignore_all_deps=False, ignore_depends_on_past=False, ignore_task_deps=False, ignore_ti_state=False, -pool=simple_task_instance.pool, +pool=ti.pool, file_path=simple_dag.full_filepath, pickle_id=simple_dag.pickle_id) -priority = simple_task_instance.priority_weight -queue = simple_task_instance.queue +priority = ti.priority_weight +queue = ti.queue self.log.info( "Sending %s to executor with priority %s and queue %s", -simple_task_instance.key, priority, queue +ti.key, priority, queue ) self.executor.queue_command( Review comment: This is actually more complex than it seems -- calling queue_task_instance sends it via `ti.command_as_list`, but to get the same behaviour with filepath we'd need to have ti.task.dag be set, but the ti doesn't have a task set. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r367151692 ## File path: airflow/jobs/scheduler_job.py ## @@ -1306,33 +1271,32 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, :param simple_dag_bag: Should contains all of the task_instances' dags :type simple_dag_bag: airflow.utils.dag_processing.SimpleDagBag """ -TI = models.TaskInstance # actually enqueue them -for simple_task_instance in simple_task_instances: -simple_dag = simple_dag_bag.get_dag(simple_task_instance.dag_id) -command = TI.generate_command( -simple_task_instance.dag_id, -simple_task_instance.task_id, -simple_task_instance.execution_date, +for ti in task_instances: +simple_dag = simple_dag_bag.get_dag(ti.dag_id) +command = ti.generate_command( +ti.dag_id, +ti.task_id, +ti.execution_date, local=True, mark_success=False, ignore_all_deps=False, ignore_depends_on_past=False, ignore_task_deps=False, ignore_ti_state=False, -pool=simple_task_instance.pool, +pool=ti.pool, file_path=simple_dag.full_filepath, pickle_id=simple_dag.pickle_id) -priority = simple_task_instance.priority_weight -queue = simple_task_instance.queue +priority = ti.priority_weight +queue = ti.queue self.log.info( "Sending %s to executor with priority %s and queue %s", -simple_task_instance.key, priority, queue +ti.key, priority, queue ) self.executor.queue_command( Review comment: Oh queue_task_instance already exists -- I forgot you(?) added that already This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r367151106 ## File path: airflow/models/dagrun.py ## @@ -268,10 +290,14 @@ def update_state(self, session=None): Determines the overall state of the DagRun based on the state of its TaskInstances. -:return: State +:return: state, schedulable_task_instances +:rtype: (State, list[airflow.models.TaskInstance]) Review comment: Yup, that's what I'm changing it too. Link for reference http://www.sphinx-doc.org/en/master/usage/restructuredtext/domains.html#info-field-lists (at the end of that section, just before "Cross-referencing Python objects") This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r367144619 ## File path: airflow/models/dagrun.py ## @@ -268,10 +290,14 @@ def update_state(self, session=None): Determines the overall state of the DagRun based on the state of its TaskInstances. -:return: State +:return: state, schedulable_task_instances +:rtype: (State, list[airflow.models.TaskInstance]) Review comment: This is what the current version renders as: ![image](https://user-images.githubusercontent.com/34150/72477531-a6955900-37e7-11ea-8f9d-f333d195de04.png) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r367144376 ## File path: airflow/models/dagrun.py ## @@ -268,10 +290,14 @@ def update_state(self, session=None): Determines the overall state of the DagRun based on the state of its TaskInstances. -:return: State +:return: state, schedulable_task_instances +:rtype: (State, list[airflow.models.TaskInstance]) Review comment: For sphinx docs it should be `tuple` and `list`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r367138242 ## File path: airflow/jobs/scheduler_job.py ## @@ -1306,33 +1271,32 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, :param simple_dag_bag: Should contains all of the task_instances' dags :type simple_dag_bag: airflow.utils.dag_processing.SimpleDagBag """ -TI = models.TaskInstance # actually enqueue them -for simple_task_instance in simple_task_instances: -simple_dag = simple_dag_bag.get_dag(simple_task_instance.dag_id) -command = TI.generate_command( -simple_task_instance.dag_id, -simple_task_instance.task_id, -simple_task_instance.execution_date, +for ti in task_instances: +simple_dag = simple_dag_bag.get_dag(ti.dag_id) +command = ti.generate_command( +ti.dag_id, +ti.task_id, +ti.execution_date, local=True, mark_success=False, ignore_all_deps=False, ignore_depends_on_past=False, ignore_task_deps=False, ignore_ti_state=False, -pool=simple_task_instance.pool, +pool=ti.pool, file_path=simple_dag.full_filepath, pickle_id=simple_dag.pickle_id) -priority = simple_task_instance.priority_weight -queue = simple_task_instance.queue +priority = ti.priority_weight +queue = ti.queue self.log.info( "Sending %s to executor with priority %s and queue %s", -simple_task_instance.key, priority, queue +ti.key, priority, queue ) self.executor.queue_command( Review comment: I guess we should This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r367127376 ## File path: airflow/jobs/scheduler_job.py ## @@ -1282,21 +1253,15 @@ def _change_state_for_executable_task_instances(self, task_instances, task_instance.queued_dttm = timezone.utcnow() session.merge(task_instance) -# Generate a list of SimpleTaskInstance for the use of queuing -# them in the executor. -simple_task_instances = [SimpleTaskInstance(ti) for ti in - tis_to_set_to_queued] - task_instance_str = "\n\t".join( [repr(x) for x in tis_to_set_to_queued]) session.commit() self.log.info("Setting the following %s tasks to queued state:\n\t%s", len(tis_to_set_to_queued), task_instance_str) -return simple_task_instances +return tis_to_set_to_queued -def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, - simple_task_instances): +def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, task_instances): Review comment: This is called with the result of `self._change_state_for_executable_task_instances` which used to return this ``` simple_task_instances = [SimpleTaskInstance(ti) for ti in tis_to_set_to_queued] ``` But one of the slow downs I noticed was continually re-looking up the TI, so it has been changed to pass full TaskInstance objects around (as we are all in the same process). I'll update the docs to reflect the new code/type This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r367041811 ## File path: airflow/cli/commands/dag_command.py ## @@ -208,8 +208,8 @@ def dag_state(args): dag = get_dag(args.subdir, args.dag_id) else: dag = get_dag_by_file_location(args.dag_id) -dr = DagRun.find(dag.dag_id, execution_date=args.execution_date) -print(dr[0].state if len(dr) > 0 else None) # pylint: disable=len-as-condition +dr = DagRun.find(dag.dag_id, execution_date=args.execution_date).one_or_none() +print(dr.state if dr else None) # pylint: disable=len-as-condition Review comment: Oh yes good spot! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r365346852 ## File path: airflow/ti_deps/deps/trigger_rule_dep.py ## @@ -34,9 +35,38 @@ class TriggerRuleDep(BaseTIDep): IGNOREABLE = True IS_TASK_DEP = True +@staticmethod +def bake_dep_status_query(): +TI = airflow.models.TaskInstance +# TODO(unknown): this query becomes quite expensive with dags that have many +# tasks. It should be refactored to let the task report to the dag run and get the +# aggregates from there. +q = BAKED_QUERIES(lambda session: session.query( +func.coalesce(func.sum(case([(TI.state == State.SUCCESS, 1)], else_=0)), 0), Review comment: May not be needed after #4751 anyway. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357375764 ## File path: airflow/jobs/scheduler_job.py ## @@ -1290,7 +1256,7 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, """ TI = models.TaskInstance # actually enqueue them -for simple_task_instance in simple_task_instances: +for i, simple_task_instance in enumerate(simple_task_instances): Review comment: Yeah no, this is not needed. Reverthing this change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357370216 ## File path: airflow/models/dagrun.py ## @@ -286,25 +320,27 @@ def update_state(self, session=None): session=session ) none_depends_on_past = all(not t.task.depends_on_past for t in unfinished_tasks) -none_task_concurrency = all(t.task.task_concurrency is None -for t in unfinished_tasks) -# small speed up -if unfinished_tasks and none_depends_on_past and none_task_concurrency: -# todo: this can actually get pretty slow: one task costs between 0.01-015s -no_dependencies_met = True -for ut in unfinished_tasks: -# We need to flag upstream and check for changes because upstream -# failures/re-schedules can result in deadlock false positives -old_state = ut.state -deps_met = ut.are_dependencies_met( -dep_context=DepContext( -flag_upstream_failed=True, -ignore_in_retry_period=True, -ignore_in_reschedule_period=True), -session=session) -if deps_met or old_state != ut.current_state(session=session): -no_dependencies_met = False -break +none_task_concurrency = all(t.task.task_concurrency is None for t in unfinished_tasks) + +no_dependencies_met = True + +dep_context = DepContext(flag_upstream_failed=True) + +for ut in unfinished_tasks: +# We need to flag upstream and check for changes because upstream +# failures/re-schedules can result in deadlock false positives +old_state = ut.state +unmet_deps = list(ut.get_failed_dep_statuses(dep_context=dep_context, session=session)) +unmet_non_delay_dep = any( +unmet_dep for unmet_dep in unmet_deps +if unmet_dep.dep_name not in {NotInRetryPeriodDep.NAME, ReadyToRescheduleDep.NAME} +) + +state = ut.current_state(session=session) Review comment: This function showed up as a noticable chunk of the profling time for the DagFileProcessor, so we should decide if it's needed. Right now as it is (and was) called it re-used the same session, so would be run in the same transaction. I think this makes the call pointless as it's asking for the state of the TI, but since its in the same transaction it will have a consistent view of the row, right? I'm basing this off https://www.postgresql.org/docs/9.5/transaction-iso.html which says > Read Committed is the default isolation level in PostgreSQL. When a transaction uses this isolation level, a SELECT query (without a FOR UPDATE/SHARE clause) sees only data committed before the query began; it never sees either uncommitted data or changes committed during query execution by concurrent transactions. In effect, a SELECT query sees a snapshot of the database as of the instant the query begins to run This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357368084 ## File path: airflow/models/dagrun.py ## @@ -263,10 +294,14 @@ def update_state(self, session=None): Determines the overall state of the DagRun based on the state Review comment: I don't think so - nothing about it changed, it looks at the TIs associated with the DagRun (based on `self.get_task_instances()`) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357221905 ## File path: airflow/jobs/scheduler_job.py ## @@ -798,14 +777,7 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None): dags = self._find_dags_to_process(dagbag.dags.values(), paused_dag_ids) -# Not using multiprocessing.Queue() since it's no longer a separate -# process and due to some unusual behavior. (empty() incorrectly -# returns true as described in https://bugs.python.org/issue23582 ) -ti_keys_to_schedule = [] - -self._process_dags(dagbag, dags, ti_keys_to_schedule) - -for ti_key in ti_keys_to_schedule: +for ti_key in self._process_dags(dagbag, dags, session=session): Review comment: Oh, this was a mistake in my rebase. This should be `for ti in self._process_dags` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357140421 ## File path: airflow/jobs/scheduler_job.py ## @@ -1290,7 +1256,7 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, """ TI = models.TaskInstance # actually enqueue them -for simple_task_instance in simple_task_instances: +for i, simple_task_instance in enumerate(simple_task_instances): Review comment: Used by the `return` -- basically this avoids iterating over the loop, and then counting the length of the loop again. _This_ might have been why I wanted to avoid the `len()` call, in my large dags there are 1000s of dags. But I suspect this was me just making stuff up that isn't really needed :D This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357133237 ## File path: airflow/models/dagrun.py ## @@ -286,25 +321,27 @@ def update_state(self, session=None): session=session Review comment: Will check. I was mostly making these changes based on the hot-spots identified by the profling, but I have already extended this a bit beyond just that with the refactor of this method. Perhaps I should revert some of these changes and make them in to a separate PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357131966 ## File path: airflow/__init__.py ## @@ -48,3 +48,8 @@ login: Optional[Callable] = None integrate_plugins() + + +# Ensure that this query is build in the master process, before we fork of a sub-process to parse the DAGs +from . import ti_deps Review comment: (thought I already left this comment, but can't see it) Yes, this is a hack and needs fixing before this is ready to merge. Putting it somehere in SchedulerJob (or DagFileProcessorAgent perhaps) might be a good way, as that is the only time we need to prime the cache for this specific query. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357131107 ## File path: airflow/jobs/scheduler_job.py ## @@ -1318,6 +1284,7 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, command, priority=priority, queue=queue) +return i+1 Review comment: Hmmm yes. Maybe this changed in the rebasing, or I'm making stuff up :grin: This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357106226 ## File path: airflow/ti_deps/deps/trigger_rule_dep.py ## @@ -34,9 +35,38 @@ class TriggerRuleDep(BaseTIDep): IGNOREABLE = True IS_TASK_DEP = True +@staticmethod +def bake_dep_status_query(): +TI = airflow.models.TaskInstance +# TODO(unknown): this query becomes quite expensive with dags that have many +# tasks. It should be refactored to let the task report to the dag run and get the +# aggregates from there. +q = BAKED_QUERIES(lambda session: session.query( +func.coalesce(func.sum(case([(TI.state == State.SUCCESS, 1)], else_=0)), 0), Review comment: Yeah, I was wondering about having some DB specific optimizations in places. Somewhere else in the Scheduler we can speed it up by doing `UPDATE task_instance ... RETURNING *` to avoid a second query, I think it helped a bit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357105720 ## File path: airflow/jobs/scheduler_job.py ## @@ -798,14 +777,7 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None): dags = self._find_dags_to_process(dagbag.dags.values(), paused_dag_ids) -# Not using multiprocessing.Queue() since it's no longer a separate -# process and due to some unusual behavior. (empty() incorrectly -# returns true as described in https://bugs.python.org/issue23582 ) -ti_keys_to_schedule = [] - -self._process_dags(dagbag, dags, ti_keys_to_schedule) - -for ti_key in ti_keys_to_schedule: +for ti_key in self._process_dags(dagbag, dags, session=session): Review comment: Yes, again this was deliberate. We went from TI, to TIKey, and then back to TI when it was used! This stops an extra DB lookup. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357105421 ## File path: airflow/jobs/scheduler_job.py ## @@ -1318,6 +1284,7 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, command, priority=priority, queue=queue) +return i+1 Review comment: I'll double check, but I think this avoided a `count()` query when we already new the answer! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357104982 ## File path: airflow/jobs/scheduler_job.py ## @@ -1057,30 +1027,34 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None): TI = models.TaskInstance DR = models.DagRun DM = models.DagModel -ti_query = ( -session -.query(TI) -.filter(TI.dag_id.in_(simple_dag_bag.dag_ids)) +ti_query = BAKED_QUERIES( +lambda session: session.query(TI).filter( +TI.dag_id.in_(simple_dag_bag.dag_ids) +) .outerjoin( DR, and_(DR.dag_id == TI.dag_id, DR.execution_date == TI.execution_date) ) -.filter(or_(DR.run_id == None, # noqa: E711 pylint: disable=singleton-comparison -not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%' +.filter(or_(DR.run_id.is_(None), +not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%' Review comment: I agree, and we should store a "run_type" column or something similar on the dag run table. But this is unchanged behaviour. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries URL: https://github.com/apache/airflow/pull/6792#discussion_r357104629 ## File path: airflow/jobs/scheduler_job.py ## @@ -1006,8 +978,7 @@ def _change_state_for_tis_without_dagrun(self, ) Stats.gauge('scheduler.tasks.without_dagrun', tis_changed) -@provide_session -def __get_concurrency_maps(self, states, session=None): +def __get_concurrency_maps(self, states, session): Review comment: Because I _want_ to force a session to be reused. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services