[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2020-01-15 Thread GitBox
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL 
query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r367161194
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1306,33 +1271,32 @@ def _enqueue_task_instances_with_queued_state(self, 
simple_dag_bag,
 :param simple_dag_bag: Should contains all of the task_instances' dags
 :type simple_dag_bag: airflow.utils.dag_processing.SimpleDagBag
 """
-TI = models.TaskInstance
 # actually enqueue them
-for simple_task_instance in simple_task_instances:
-simple_dag = simple_dag_bag.get_dag(simple_task_instance.dag_id)
-command = TI.generate_command(
-simple_task_instance.dag_id,
-simple_task_instance.task_id,
-simple_task_instance.execution_date,
+for ti in task_instances:
+simple_dag = simple_dag_bag.get_dag(ti.dag_id)
+command = ti.generate_command(
+ti.dag_id,
+ti.task_id,
+ti.execution_date,
 local=True,
 mark_success=False,
 ignore_all_deps=False,
 ignore_depends_on_past=False,
 ignore_task_deps=False,
 ignore_ti_state=False,
-pool=simple_task_instance.pool,
+pool=ti.pool,
 file_path=simple_dag.full_filepath,
 pickle_id=simple_dag.pickle_id)
 
-priority = simple_task_instance.priority_weight
-queue = simple_task_instance.queue
+priority = ti.priority_weight
+queue = ti.queue
 self.log.info(
 "Sending %s to executor with priority %s and queue %s",
-simple_task_instance.key, priority, queue
+ti.key, priority, queue
 )
 
 self.executor.queue_command(
 
 Review comment:
   I've put a TODO comment in here. Not happy about doing that but I think this 
PR is big enough already.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2020-01-15 Thread GitBox
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL 
query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r367156221
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1306,33 +1271,32 @@ def _enqueue_task_instances_with_queued_state(self, 
simple_dag_bag,
 :param simple_dag_bag: Should contains all of the task_instances' dags
 :type simple_dag_bag: airflow.utils.dag_processing.SimpleDagBag
 """
-TI = models.TaskInstance
 # actually enqueue them
-for simple_task_instance in simple_task_instances:
-simple_dag = simple_dag_bag.get_dag(simple_task_instance.dag_id)
-command = TI.generate_command(
-simple_task_instance.dag_id,
-simple_task_instance.task_id,
-simple_task_instance.execution_date,
+for ti in task_instances:
+simple_dag = simple_dag_bag.get_dag(ti.dag_id)
+command = ti.generate_command(
+ti.dag_id,
+ti.task_id,
+ti.execution_date,
 local=True,
 mark_success=False,
 ignore_all_deps=False,
 ignore_depends_on_past=False,
 ignore_task_deps=False,
 ignore_ti_state=False,
-pool=simple_task_instance.pool,
+pool=ti.pool,
 file_path=simple_dag.full_filepath,
 pickle_id=simple_dag.pickle_id)
 
-priority = simple_task_instance.priority_weight
-queue = simple_task_instance.queue
+priority = ti.priority_weight
+queue = ti.queue
 self.log.info(
 "Sending %s to executor with priority %s and queue %s",
-simple_task_instance.key, priority, queue
+ti.key, priority, queue
 )
 
 self.executor.queue_command(
 
 Review comment:
   Fixing this properly will also fix 
https://issues.apache.org/jira/browse/AIRFLOW-3877


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2020-01-15 Thread GitBox
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL 
query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r367154648
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1306,33 +1271,32 @@ def _enqueue_task_instances_with_queued_state(self, 
simple_dag_bag,
 :param simple_dag_bag: Should contains all of the task_instances' dags
 :type simple_dag_bag: airflow.utils.dag_processing.SimpleDagBag
 """
-TI = models.TaskInstance
 # actually enqueue them
-for simple_task_instance in simple_task_instances:
-simple_dag = simple_dag_bag.get_dag(simple_task_instance.dag_id)
-command = TI.generate_command(
-simple_task_instance.dag_id,
-simple_task_instance.task_id,
-simple_task_instance.execution_date,
+for ti in task_instances:
+simple_dag = simple_dag_bag.get_dag(ti.dag_id)
+command = ti.generate_command(
+ti.dag_id,
+ti.task_id,
+ti.execution_date,
 local=True,
 mark_success=False,
 ignore_all_deps=False,
 ignore_depends_on_past=False,
 ignore_task_deps=False,
 ignore_ti_state=False,
-pool=simple_task_instance.pool,
+pool=ti.pool,
 file_path=simple_dag.full_filepath,
 pickle_id=simple_dag.pickle_id)
 
-priority = simple_task_instance.priority_weight
-queue = simple_task_instance.queue
+priority = ti.priority_weight
+queue = ti.queue
 self.log.info(
 "Sending %s to executor with priority %s and queue %s",
-simple_task_instance.key, priority, queue
+ti.key, priority, queue
 )
 
 self.executor.queue_command(
 
 Review comment:
   This is actually more complex than it seems -- calling queue_task_instance 
sends it via `ti.command_as_list`, but to get the same behaviour with filepath 
we'd need to have ti.task.dag be set, but the ti doesn't have a task set.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2020-01-15 Thread GitBox
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL 
query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r367151692
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1306,33 +1271,32 @@ def _enqueue_task_instances_with_queued_state(self, 
simple_dag_bag,
 :param simple_dag_bag: Should contains all of the task_instances' dags
 :type simple_dag_bag: airflow.utils.dag_processing.SimpleDagBag
 """
-TI = models.TaskInstance
 # actually enqueue them
-for simple_task_instance in simple_task_instances:
-simple_dag = simple_dag_bag.get_dag(simple_task_instance.dag_id)
-command = TI.generate_command(
-simple_task_instance.dag_id,
-simple_task_instance.task_id,
-simple_task_instance.execution_date,
+for ti in task_instances:
+simple_dag = simple_dag_bag.get_dag(ti.dag_id)
+command = ti.generate_command(
+ti.dag_id,
+ti.task_id,
+ti.execution_date,
 local=True,
 mark_success=False,
 ignore_all_deps=False,
 ignore_depends_on_past=False,
 ignore_task_deps=False,
 ignore_ti_state=False,
-pool=simple_task_instance.pool,
+pool=ti.pool,
 file_path=simple_dag.full_filepath,
 pickle_id=simple_dag.pickle_id)
 
-priority = simple_task_instance.priority_weight
-queue = simple_task_instance.queue
+priority = ti.priority_weight
+queue = ti.queue
 self.log.info(
 "Sending %s to executor with priority %s and queue %s",
-simple_task_instance.key, priority, queue
+ti.key, priority, queue
 )
 
 self.executor.queue_command(
 
 Review comment:
   Oh queue_task_instance already exists -- I forgot you(?) added that already


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2020-01-15 Thread GitBox
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL 
query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r367151106
 
 

 ##
 File path: airflow/models/dagrun.py
 ##
 @@ -268,10 +290,14 @@ def update_state(self, session=None):
 Determines the overall state of the DagRun based on the state
 of its TaskInstances.
 
-:return: State
+:return: state, schedulable_task_instances
+:rtype: (State, list[airflow.models.TaskInstance])
 
 Review comment:
   Yup, that's what I'm changing it too. Link for reference 
http://www.sphinx-doc.org/en/master/usage/restructuredtext/domains.html#info-field-lists
 (at the end of that section, just before "Cross-referencing Python objects")


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2020-01-15 Thread GitBox
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL 
query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r367144619
 
 

 ##
 File path: airflow/models/dagrun.py
 ##
 @@ -268,10 +290,14 @@ def update_state(self, session=None):
 Determines the overall state of the DagRun based on the state
 of its TaskInstances.
 
-:return: State
+:return: state, schedulable_task_instances
+:rtype: (State, list[airflow.models.TaskInstance])
 
 Review comment:
   This is what the current version renders as:
   
   
![image](https://user-images.githubusercontent.com/34150/72477531-a6955900-37e7-11ea-8f9d-f333d195de04.png)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2020-01-15 Thread GitBox
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL 
query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r367144376
 
 

 ##
 File path: airflow/models/dagrun.py
 ##
 @@ -268,10 +290,14 @@ def update_state(self, session=None):
 Determines the overall state of the DagRun based on the state
 of its TaskInstances.
 
-:return: State
+:return: state, schedulable_task_instances
+:rtype: (State, list[airflow.models.TaskInstance])
 
 Review comment:
   For sphinx docs it should be `tuple` and `list`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2020-01-15 Thread GitBox
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL 
query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r367138242
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1306,33 +1271,32 @@ def _enqueue_task_instances_with_queued_state(self, 
simple_dag_bag,
 :param simple_dag_bag: Should contains all of the task_instances' dags
 :type simple_dag_bag: airflow.utils.dag_processing.SimpleDagBag
 """
-TI = models.TaskInstance
 # actually enqueue them
-for simple_task_instance in simple_task_instances:
-simple_dag = simple_dag_bag.get_dag(simple_task_instance.dag_id)
-command = TI.generate_command(
-simple_task_instance.dag_id,
-simple_task_instance.task_id,
-simple_task_instance.execution_date,
+for ti in task_instances:
+simple_dag = simple_dag_bag.get_dag(ti.dag_id)
+command = ti.generate_command(
+ti.dag_id,
+ti.task_id,
+ti.execution_date,
 local=True,
 mark_success=False,
 ignore_all_deps=False,
 ignore_depends_on_past=False,
 ignore_task_deps=False,
 ignore_ti_state=False,
-pool=simple_task_instance.pool,
+pool=ti.pool,
 file_path=simple_dag.full_filepath,
 pickle_id=simple_dag.pickle_id)
 
-priority = simple_task_instance.priority_weight
-queue = simple_task_instance.queue
+priority = ti.priority_weight
+queue = ti.queue
 self.log.info(
 "Sending %s to executor with priority %s and queue %s",
-simple_task_instance.key, priority, queue
+ti.key, priority, queue
 )
 
 self.executor.queue_command(
 
 Review comment:
   I guess we should


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2020-01-15 Thread GitBox
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL 
query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r367127376
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1282,21 +1253,15 @@ def _change_state_for_executable_task_instances(self, 
task_instances,
 task_instance.queued_dttm = timezone.utcnow()
 session.merge(task_instance)
 
-# Generate a list of SimpleTaskInstance for the use of queuing
-# them in the executor.
-simple_task_instances = [SimpleTaskInstance(ti) for ti in
- tis_to_set_to_queued]
-
 task_instance_str = "\n\t".join(
 [repr(x) for x in tis_to_set_to_queued])
 
 session.commit()
 self.log.info("Setting the following %s tasks to queued state:\n\t%s",
   len(tis_to_set_to_queued), task_instance_str)
-return simple_task_instances
+return tis_to_set_to_queued
 
-def _enqueue_task_instances_with_queued_state(self, simple_dag_bag,
-  simple_task_instances):
+def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, 
task_instances):
 
 Review comment:
   This is called with the result of 
`self._change_state_for_executable_task_instances` which used to return this
   
   ```
   simple_task_instances = [SimpleTaskInstance(ti) for ti in
tis_to_set_to_queued]
   ```
   
   But one of the slow downs I noticed was continually re-looking up the TI, so 
it has been changed to pass full TaskInstance objects around (as we are all in 
the same process).
   
   I'll update the docs to reflect the new code/type


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2020-01-15 Thread GitBox
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL 
query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r367041811
 
 

 ##
 File path: airflow/cli/commands/dag_command.py
 ##
 @@ -208,8 +208,8 @@ def dag_state(args):
 dag = get_dag(args.subdir, args.dag_id)
 else:
 dag = get_dag_by_file_location(args.dag_id)
-dr = DagRun.find(dag.dag_id, execution_date=args.execution_date)
-print(dr[0].state if len(dr) > 0 else None)  # pylint: 
disable=len-as-condition
+dr = DagRun.find(dag.dag_id, 
execution_date=args.execution_date).one_or_none()
+print(dr.state if dr else None)  # pylint: disable=len-as-condition
 
 Review comment:
   Oh yes good spot!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2020-01-10 Thread GitBox
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL 
query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r365346852
 
 

 ##
 File path: airflow/ti_deps/deps/trigger_rule_dep.py
 ##
 @@ -34,9 +35,38 @@ class TriggerRuleDep(BaseTIDep):
 IGNOREABLE = True
 IS_TASK_DEP = True
 
+@staticmethod
+def bake_dep_status_query():
+TI = airflow.models.TaskInstance
+# TODO(unknown): this query becomes quite expensive with dags that 
have many
+# tasks. It should be refactored to let the task report to the dag run 
and get the
+# aggregates from there.
+q = BAKED_QUERIES(lambda session: session.query(
+func.coalesce(func.sum(case([(TI.state == State.SUCCESS, 1)], 
else_=0)), 0),
 
 Review comment:
   May not be needed after #4751 anyway.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-12 Thread GitBox
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL 
query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357375764
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1290,7 +1256,7 @@ def _enqueue_task_instances_with_queued_state(self, 
simple_dag_bag,
 """
 TI = models.TaskInstance
 # actually enqueue them
-for simple_task_instance in simple_task_instances:
+for i, simple_task_instance in enumerate(simple_task_instances):
 
 Review comment:
   Yeah no, this is not needed. Reverthing this change.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-12 Thread GitBox
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL 
query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357370216
 
 

 ##
 File path: airflow/models/dagrun.py
 ##
 @@ -286,25 +320,27 @@ def update_state(self, session=None):
 session=session
 )
 none_depends_on_past = all(not t.task.depends_on_past for t in 
unfinished_tasks)
-none_task_concurrency = all(t.task.task_concurrency is None
-for t in unfinished_tasks)
-# small speed up
-if unfinished_tasks and none_depends_on_past and none_task_concurrency:
-# todo: this can actually get pretty slow: one task costs between 
0.01-015s
-no_dependencies_met = True
-for ut in unfinished_tasks:
-# We need to flag upstream and check for changes because 
upstream
-# failures/re-schedules can result in deadlock false positives
-old_state = ut.state
-deps_met = ut.are_dependencies_met(
-dep_context=DepContext(
-flag_upstream_failed=True,
-ignore_in_retry_period=True,
-ignore_in_reschedule_period=True),
-session=session)
-if deps_met or old_state != ut.current_state(session=session):
-no_dependencies_met = False
-break
+none_task_concurrency = all(t.task.task_concurrency is None for t in 
unfinished_tasks)
+
+no_dependencies_met = True
+
+dep_context = DepContext(flag_upstream_failed=True)
+
+for ut in unfinished_tasks:
+# We need to flag upstream and check for changes because upstream
+# failures/re-schedules can result in deadlock false positives
+old_state = ut.state
+unmet_deps = 
list(ut.get_failed_dep_statuses(dep_context=dep_context, session=session))
+unmet_non_delay_dep = any(
+unmet_dep for unmet_dep in unmet_deps
+if unmet_dep.dep_name not in {NotInRetryPeriodDep.NAME, 
ReadyToRescheduleDep.NAME}
+)
+
+state = ut.current_state(session=session)
 
 Review comment:
   This function showed up as a noticable chunk of the profling time for the 
DagFileProcessor, so we should decide if it's needed.
   
   Right now as it is (and was) called it re-used the same session, so would be 
run in the same transaction. I think this makes the call pointless as it's 
asking for the state of the TI, but since its in the same transaction it will 
have a consistent view of the row, right?
   
   I'm basing this off https://www.postgresql.org/docs/9.5/transaction-iso.html 
which says 
   
   > Read Committed is the default isolation level in PostgreSQL. When a 
transaction uses this isolation level, a SELECT query (without a FOR 
UPDATE/SHARE clause) sees only data committed before the query began; it never 
sees either uncommitted data or changes committed during query execution by 
concurrent transactions. In effect, a SELECT query sees a snapshot of the 
database as of the instant the query begins to run


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-12 Thread GitBox
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL 
query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357368084
 
 

 ##
 File path: airflow/models/dagrun.py
 ##
 @@ -263,10 +294,14 @@ def update_state(self, session=None):
 Determines the overall state of the DagRun based on the state
 
 Review comment:
   I don't think so - nothing about it changed, it looks at the TIs associated 
with the DagRun (based on `self.get_task_instances()`)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-12 Thread GitBox
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL 
query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357221905
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -798,14 +777,7 @@ def process_file(self, file_path, zombies, 
pickle_dags=False, session=None):
 
 dags = self._find_dags_to_process(dagbag.dags.values(), paused_dag_ids)
 
-# Not using multiprocessing.Queue() since it's no longer a separate
-# process and due to some unusual behavior. (empty() incorrectly
-# returns true as described in https://bugs.python.org/issue23582 )
-ti_keys_to_schedule = []
-
-self._process_dags(dagbag, dags, ti_keys_to_schedule)
-
-for ti_key in ti_keys_to_schedule:
+for ti_key in self._process_dags(dagbag, dags, session=session):
 
 Review comment:
   Oh, this was a mistake in my rebase. This should be `for ti in 
self._process_dags`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-12 Thread GitBox
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL 
query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357140421
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1290,7 +1256,7 @@ def _enqueue_task_instances_with_queued_state(self, 
simple_dag_bag,
 """
 TI = models.TaskInstance
 # actually enqueue them
-for simple_task_instance in simple_task_instances:
+for i, simple_task_instance in enumerate(simple_task_instances):
 
 Review comment:
   Used by the `return` -- basically this avoids iterating over the loop, and 
then counting the length of the loop again.
   
   _This_ might have been why I wanted to avoid the `len()` call, in my large 
dags there are 1000s of dags. But I suspect this was me just making stuff up 
that isn't really needed :D 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-12 Thread GitBox
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL 
query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357133237
 
 

 ##
 File path: airflow/models/dagrun.py
 ##
 @@ -286,25 +321,27 @@ def update_state(self, session=None):
 session=session
 
 Review comment:
   Will check. I was mostly making these changes based on the hot-spots 
identified by the profling, but I have already extended this a bit beyond just 
that with the refactor of this method.
   
   Perhaps I should revert some of these changes and make them in to a separate 
PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-12 Thread GitBox
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL 
query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357131966
 
 

 ##
 File path: airflow/__init__.py
 ##
 @@ -48,3 +48,8 @@
 login: Optional[Callable] = None
 
 integrate_plugins()
+
+
+# Ensure that this query is build in the master process, before we fork of a 
sub-process to parse the DAGs
+from . import ti_deps
 
 Review comment:
   (thought I already left this comment, but can't see it)
   
   Yes, this is a hack and needs fixing before this is ready to merge. Putting 
it somehere in SchedulerJob (or DagFileProcessorAgent perhaps) might be a good 
way, as that is the only time we need to prime the cache for this specific 
query.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-12 Thread GitBox
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL 
query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357131107
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1318,6 +1284,7 @@ def _enqueue_task_instances_with_queued_state(self, 
simple_dag_bag,
 command,
 priority=priority,
 queue=queue)
+return i+1
 
 Review comment:
   Hmmm yes. Maybe this changed in the rebasing, or I'm making stuff up :grin:


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-12 Thread GitBox
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL 
query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357106226
 
 

 ##
 File path: airflow/ti_deps/deps/trigger_rule_dep.py
 ##
 @@ -34,9 +35,38 @@ class TriggerRuleDep(BaseTIDep):
 IGNOREABLE = True
 IS_TASK_DEP = True
 
+@staticmethod
+def bake_dep_status_query():
+TI = airflow.models.TaskInstance
+# TODO(unknown): this query becomes quite expensive with dags that 
have many
+# tasks. It should be refactored to let the task report to the dag run 
and get the
+# aggregates from there.
+q = BAKED_QUERIES(lambda session: session.query(
+func.coalesce(func.sum(case([(TI.state == State.SUCCESS, 1)], 
else_=0)), 0),
 
 Review comment:
   Yeah, I was wondering about having some DB specific optimizations in places. 
Somewhere else in the Scheduler we can speed it up by doing `UPDATE 
task_instance ... RETURNING *` to avoid a second query, I think it helped a bit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-12 Thread GitBox
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL 
query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357105720
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -798,14 +777,7 @@ def process_file(self, file_path, zombies, 
pickle_dags=False, session=None):
 
 dags = self._find_dags_to_process(dagbag.dags.values(), paused_dag_ids)
 
-# Not using multiprocessing.Queue() since it's no longer a separate
-# process and due to some unusual behavior. (empty() incorrectly
-# returns true as described in https://bugs.python.org/issue23582 )
-ti_keys_to_schedule = []
-
-self._process_dags(dagbag, dags, ti_keys_to_schedule)
-
-for ti_key in ti_keys_to_schedule:
+for ti_key in self._process_dags(dagbag, dags, session=session):
 
 Review comment:
   Yes, again this was deliberate.
   
   We went from TI, to TIKey, and then back to TI when it was used! This stops 
an extra DB lookup.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-12 Thread GitBox
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL 
query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357105421
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1318,6 +1284,7 @@ def _enqueue_task_instances_with_queued_state(self, 
simple_dag_bag,
 command,
 priority=priority,
 queue=queue)
+return i+1
 
 Review comment:
   I'll double check, but I think this avoided a `count()` query when we 
already new the answer!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-12 Thread GitBox
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL 
query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357104982
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1057,30 +1027,34 @@ def _find_executable_task_instances(self, 
simple_dag_bag, states, session=None):
 TI = models.TaskInstance
 DR = models.DagRun
 DM = models.DagModel
-ti_query = (
-session
-.query(TI)
-.filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
+ti_query = BAKED_QUERIES(
+lambda session: session.query(TI).filter(
+TI.dag_id.in_(simple_dag_bag.dag_ids)
+)
 .outerjoin(
 DR,
 and_(DR.dag_id == TI.dag_id, DR.execution_date == 
TI.execution_date)
 )
-.filter(or_(DR.run_id == None,  # noqa: E711 pylint: 
disable=singleton-comparison
-not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%'
+.filter(or_(DR.run_id.is_(None),
+not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%'
 
 Review comment:
   I agree, and we should store a "run_type" column or something similar on the 
dag run table.
   
   But this is unchanged behaviour.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL query building for hot-path queries

2019-12-12 Thread GitBox
ashb commented on a change in pull request #6792: [AIRFLOW-5930] Use cached-SQL 
query building for hot-path queries
URL: https://github.com/apache/airflow/pull/6792#discussion_r357104629
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1006,8 +978,7 @@ def _change_state_for_tis_without_dagrun(self,
 )
 Stats.gauge('scheduler.tasks.without_dagrun', tis_changed)
 
-@provide_session
-def __get_concurrency_maps(self, states, session=None):
+def __get_concurrency_maps(self, states, session):
 
 Review comment:
   Because I _want_ to force a session to be reused.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services