[jira] [Comment Edited] (AIRFLOW-111) DAG concurrency is not honored
[ https://issues.apache.org/jira/browse/AIRFLOW-111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319568#comment-15319568 ] Shenghu Yang edited comment on AIRFLOW-111 at 6/11/16 8:41 PM: --- I think the bug is here: https://github.com/apache/incubator-airflow/blob/18009d03311a0b29e14811865e0b13b19427b5e4/airflow/models.py#L1233-L1247 Let's see one task instance is already been 'QUEUED', and next time when we hit the above code, it will NOT check the 'self.task.dag.concurrency_reached', hence it will run the task instance beyond dag's concurrency. One fix would be right after the above code, we add the following: if self.task.dag.concurrency_reached: logging.warning('Job_id={} continues to wait, since dag concurrency limit [{}] is reached'.format(self.job_id, self.task.dag.concurrency)) return else: logging.info('Job_id={} starts to run, since dag concurrency limit [{}] is not reached'.format(self.job_id, self.task.dag.concurrency)) was (Author: shenghu): I think the bug is here: https://github.com/apache/incubator-airflow/blob/18009d03311a0b29e14811865e0b13b19427b5e4/airflow/models.py#L1233-L1247 Let's see one task is already been 'QUEUED', and next time when we hit the above code, it will NOT check the 'self.task.dag.concurrency_reached', hence it will run the task beyond dag's concurrency. One fix would be right after the above code, we add the following: if self.task.dag.concurrency_reached: logging.warning('Job_id={} continues to wait, since dag concurrency limit [{}] is reached'.format(self.job_id, self.task.dag.concurrency)) return else: logging.info('Job_id={} starts to run, since dag concurrency limit [{}] is not reached'.format(self.job_id, self.task.dag.concurrency)) > DAG concurrency is not honored > -- > > Key: AIRFLOW-111 > URL: https://issues.apache.org/jira/browse/AIRFLOW-111 > Project: Apache Airflow > Issue Type: Sub-task > Components: celery, scheduler >Affects Versions: Airflow 1.6.2, Airflow 1.7.1.2 > Environment: Version of Airflow: 1.6.2 > Airflow configuration: Running a Scheduler with LocalExecutor > Operating System: 3.13.0-74-generic #118-Ubuntu SMP Thu Dec 17 22:52:10 UTC > 2015 x86_64 x86_64 x86_64 GNU/Linux > Python Version: 2.7.6 > Screen shots of your DAG's status: >Reporter: Shenghu Yang > Fix For: Airflow 2.0 > > > Description of Issue > In airflow.cfg, we set: max_active_runs_per_dag = 1 > In our dag, we set the dag_args['concurrency'] = 8, however, when the > scheduler starts to run, we can see this concurrency is not being honored, > airflow scheduler will run up to num of the 'parallelism' (we set as 25) task > instances for the ONE run dag_run. > What did you expect to happen? > dag_args['concurrency'] = 8 is honored, e.g. only run at most 8 task > instances concurrently. > What happened instead? > when the dag starts to run, we can see the concurrency is not being honored, > airflow scheduler/celery worker will run up to the 'parallelism' (we set as > 25) task instances. > Here is how you can reproduce this issue on your machine: > create a dag which contains nothing but 25 parallelized tasks. > set the dag dag_args['concurrency'] = 8 > set the airflow parallelism = 25, and max_active_runs_per_dag = 1 > then run: airflow scheduler > you will see all 25 task instance are scheduled to run, not 8. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (AIRFLOW-111) DAG concurrency is not honored
[ https://issues.apache.org/jira/browse/AIRFLOW-111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shenghu Yang updated AIRFLOW-111: - Description: Description of Issue In airflow.cfg, we set: max_active_runs_per_dag = 1 In our dag, we set the dag_args['concurrency'] = 8, however, when the scheduler starts to run, we can see this concurrency is not being honored, airflow scheduler will run up to num of the 'parallelism' (we set as 25) task instances for the ONE run dag_run. What did you expect to happen? dag_args['concurrency'] = 8 is honored, e.g. only run at most 8 task instances concurrently. What happened instead? when the dag starts to run, we can see the concurrency is not being honored, airflow scheduler/celery worker will run up to the 'parallelism' (we set as 25) task instances. Here is how you can reproduce this issue on your machine: create a dag which contains nothing but 25 parallelized tasks. set the dag dag_args['concurrency'] = 8 set the airflow parallelism = 25, and max_active_runs_per_dag = 1 then run: airflow scheduler you will see all 25 task instance are scheduled to run, not 8. was: Description of Issue In our dag, we set the dag_args['concurrency'] = 8, however, when the scheduler starts to run, we can see this concurrency is not being honored, airflow scheduler will run up to num of the 'parallelism' (we set as 25) jobs. What did you expect to happen? dag_args['concurrency'] = 8 is honored, e.g. only run at most 8 jobs concurrently. What happened instead? when the dag starts to run, we can see the concurrency is not being honored, airflow scheduler/celery worker will run up to the 'parallelism' (we set as 25) jobs. Here is how you can reproduce this issue on your machine: create a dag which contains nothing but 25 parallelized jobs. set the dag dag_args['concurrency'] = 8 set the airflow parallelism to 25 then run: airflow scheduler you will see all 25 jobs are scheduled to run, not 8. > DAG concurrency is not honored > -- > > Key: AIRFLOW-111 > URL: https://issues.apache.org/jira/browse/AIRFLOW-111 > Project: Apache Airflow > Issue Type: Sub-task > Components: celery, scheduler >Affects Versions: Airflow 1.6.2, Airflow 1.7.1.2 > Environment: Version of Airflow: 1.6.2 > Airflow configuration: Running a Scheduler with LocalExecutor > Operating System: 3.13.0-74-generic #118-Ubuntu SMP Thu Dec 17 22:52:10 UTC > 2015 x86_64 x86_64 x86_64 GNU/Linux > Python Version: 2.7.6 > Screen shots of your DAG's status: >Reporter: Shenghu Yang > Fix For: Airflow 2.0 > > > Description of Issue > In airflow.cfg, we set: max_active_runs_per_dag = 1 > In our dag, we set the dag_args['concurrency'] = 8, however, when the > scheduler starts to run, we can see this concurrency is not being honored, > airflow scheduler will run up to num of the 'parallelism' (we set as 25) task > instances for the ONE run dag_run. > What did you expect to happen? > dag_args['concurrency'] = 8 is honored, e.g. only run at most 8 task > instances concurrently. > What happened instead? > when the dag starts to run, we can see the concurrency is not being honored, > airflow scheduler/celery worker will run up to the 'parallelism' (we set as > 25) task instances. > Here is how you can reproduce this issue on your machine: > create a dag which contains nothing but 25 parallelized tasks. > set the dag dag_args['concurrency'] = 8 > set the airflow parallelism = 25, and max_active_runs_per_dag = 1 > then run: airflow scheduler > you will see all 25 task instance are scheduled to run, not 8. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (AIRFLOW-236) Support passing S3 credentials through environmental variables
Jakob Homan created AIRFLOW-236: --- Summary: Support passing S3 credentials through environmental variables Key: AIRFLOW-236 URL: https://issues.apache.org/jira/browse/AIRFLOW-236 Project: Apache Airflow Issue Type: Improvement Components: core Reporter: Jakob Homan Right now we expect S3 configs to be passed through one of a variety of config files, or through extra parameters in the connection screen. It'd be nice to be able to pass these through env variables and note as such through the extra parameters. This would lessen the need to include credentials in the webapp itself. Alternatively, for logging (rather than as a connector), it might just be better for Airflow to use the profie defined as AWS_DEFAULT and avoid needed an explicit configuration at all. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (AIRFLOW-68) Align start_date with the schedule_interval
[ https://issues.apache.org/jira/browse/AIRFLOW-68?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325832#comment-15325832 ] ASF subversion and git services commented on AIRFLOW-68: Commit f69eec3b44cbb0a0fb46a17baec195f7f3baf50e in incubator-airflow's branch refs/heads/master from [~bolke] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=f69eec3 ] [AIRFLOW-68] Align start_date with the schedule_interval This particular issue arises because of an alignment issue between start_date and schedule_interval. This can only happen with cron-based schedule_intervals that describe absolute points in time (like “1am”) as opposed to time deltas (like “every hour”) In the past (and in the docs) we have simply said that users must make sure the two params agree. But this is counter intuitive. As in these cases, start_date is sort of like telling the scheduler to “start paying attention” as opposed to “this is my first execution date”. This patch changes the behavior of the scheduler. The next run date of the dag will be treated as "start_date + interval" unless the start_date is on the (previous) interval in which case the start_date will be the next run date. > Align start_date with the schedule_interval > --- > > Key: AIRFLOW-68 > URL: https://issues.apache.org/jira/browse/AIRFLOW-68 > Project: Apache Airflow > Issue Type: Sub-task > Components: scheduler >Affects Versions: Airflow 1.7.1 >Reporter: Bolke de Bruin > Fix For: Airflow 1.8 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[1/2] incubator-airflow git commit: [AIRFLOW-68] Align start_date with the schedule_interval
Repository: incubator-airflow Updated Branches: refs/heads/master b7def7f1f -> 901e8f2a9 [AIRFLOW-68] Align start_date with the schedule_interval This particular issue arises because of an alignment issue between start_date and schedule_interval. This can only happen with cron-based schedule_intervals that describe absolute points in time (like â1amâ) as opposed to time deltas (like âevery hourâ) In the past (and in the docs) we have simply said that users must make sure the two params agree. But this is counter intuitive. As in these cases, start_date is sort of like telling the scheduler to âstart paying attentionâ as opposed to âthis is my first execution dateâ. This patch changes the behavior of the scheduler. The next run date of the dag will be treated as "start_date + interval" unless the start_date is on the (previous) interval in which case the start_date will be the next run date. Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f69eec3b Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f69eec3b Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f69eec3b Branch: refs/heads/master Commit: f69eec3b44cbb0a0fb46a17baec195f7f3baf50e Parents: 03ce4b9 Author: Bolke de BruinAuthored: Tue Jun 7 09:52:05 2016 +0200 Committer: Bolke de Bruin Committed: Tue Jun 7 10:52:09 2016 +0200 -- airflow/jobs.py | 12 ++-- airflow/models.py | 15 +++ docs/faq.rst | 15 ++- tests/jobs.py | 52 -- 4 files changed, 85 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f69eec3b/airflow/jobs.py -- diff --git a/airflow/jobs.py b/airflow/jobs.py index 5aaab3b..3a2d97a 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -424,13 +424,21 @@ class SchedulerJob(BaseJob): # First run task_start_dates = [t.start_date for t in dag.tasks] if task_start_dates: -next_run_date = min(task_start_dates) +next_run_date = dag.normalize_schedule(min(task_start_dates)) +self.logger.debug("Next run date based on tasks {}" + .format(next_run_date)) else: next_run_date = dag.following_schedule(last_scheduled_run) # don't ever schedule prior to the dag's start_date if dag.start_date: -next_run_date = dag.start_date if not next_run_date else max(next_run_date, dag.start_date) +next_run_date = (dag.start_date if not next_run_date + else max(next_run_date, dag.start_date)) +if next_run_date == dag.start_date: +next_run_date = dag.normalize_schedule(dag.start_date) + +self.logger.debug("Dag start date: {}. Next run date: {}" + .format(dag.start_date, next_run_date)) # this structure is necessary to avoid a TypeError from concatenating # NoneType http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f69eec3b/airflow/models.py -- diff --git a/airflow/models.py b/airflow/models.py index 08d0890..b6b7987 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -2567,6 +2567,21 @@ class DAG(LoggingMixin): elif isinstance(self._schedule_interval, timedelta): return dttm - self._schedule_interval +def normalize_schedule(self, dttm): +""" +Returns dttm + interval unless dttm is first interval then it returns dttm +""" +following = self.following_schedule(dttm) + +# in case of @once +if not following: +return dttm + +if self.previous_schedule(following) != dttm: +return following + +return dttm + @property def tasks(self): return list(self.task_dict.values()) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f69eec3b/docs/faq.rst -- diff --git a/docs/faq.rst b/docs/faq.rst index 21623fc..e61c1bf 100644 --- a/docs/faq.rst +++ b/docs/faq.rst @@ -78,12 +78,17 @@ We recommend against using dynamic values as ``start_date``, especially once the period closes, and in theory an ``@hourly`` DAG would never get to an hour after now as ``now()`` moves along. -We also recommend using rounded ``start_date`` in relation to your -``schedule_interval``. This means an
[2/2] incubator-airflow git commit: Merge branch 'align_startdate'
Merge branch 'align_startdate' Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/901e8f2a Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/901e8f2a Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/901e8f2a Branch: refs/heads/master Commit: 901e8f2a95cda2bf90b3776df2e2565cfca7925e Parents: b7def7f f69eec3 Author: Bolke de BruinAuthored: Sat Jun 11 13:47:27 2016 +0200 Committer: Bolke de Bruin Committed: Sat Jun 11 13:47:27 2016 +0200 -- airflow/jobs.py | 12 ++-- airflow/models.py | 15 +++ docs/faq.rst | 15 ++- tests/jobs.py | 52 -- 4 files changed, 85 insertions(+), 9 deletions(-) --
[jira] [Commented] (AIRFLOW-31) Use standard imports for hooks/operators
[ https://issues.apache.org/jira/browse/AIRFLOW-31?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325763#comment-15325763 ] Chris Riccomini commented on AIRFLOW-31: Please have a look at AIRFLOW-200, and [this|https://github.com/apache/incubator-airflow/pull/1586] PR. While it does not use standard imports, it does improve the experience for the current import style, so that people can see why hooks/operators can't import, and makes loads lazy. > Use standard imports for hooks/operators > > > Key: AIRFLOW-31 > URL: https://issues.apache.org/jira/browse/AIRFLOW-31 > Project: Apache Airflow > Issue Type: Improvement > Components: operators >Reporter: Jeremiah Lowin >Assignee: Jeremiah Lowin > Labels: enhancement > Fix For: Airflow 2.0 > > > (Migrated from https://github.com/airbnb/airflow/issues/1238) > Currently, Airflow uses a relatively complex import mechanism to import hooks > and operators without polluting the namespace with submodules. I would like > to propose that Airflow abandon that system and use standard Python importing. > Here are a few major reasons why I think the current system has run its > course. > h3. Polluting namespace > The biggest advantage of the current system, as I understand it, is that only > Operators appear in the `airflow.operators` namespace. The submodules that > actually contain the operators do not. > So for example while `airflow.operators.python_operator.PythonOperator` is a > thing, `PythonOperator` is in the `airflow.operators` namespace but > `python_operator` is not. > I think this sort of namespace pollution was helpful when Airflow was a > smaller project, but as the number of hooks/operators grows -- and especially > as the `contrib` hooks/operators grow -- I'd argue that namespacing is a > *good thing*. It provides structure and organization, and opportunities for > documentation (through module docstrings). > In fact, I'd argue that the current namespace is itself getting quite > polluted -- the only way to know what's available is to use something like > Ipython tab-completion to browse an alphabetical list of Operator names, or > to load the source file and grok the import definition (which no one > installing from pypi is likely to do). > h3. Conditional imports > There's a second advantage to the current system that any module that fails > to import is silently ignored. It makes it easy to have optional > dependencies. For example, if someone doesn't have `boto` installed, then > they don't have an `S3Hook` either. Same for a HiveOperator > Again, as Airflow grows and matures, I think this is a little too magic. If > my environment is missing a dependency, I want to hear about it. > On the other hand, the `contrib` namespace sort of depends on this -- we > don't want users to have to install every single dependency. So I propose > that contrib modules all live in their submodules: `from > airflow.contrib.operators.my_operator import MyOperator`. As mentioned > previously, having structure and namespacing is a good thing as the project > gets more complex. > Other ways to handle this include putting "non-standard" dependencies inside > the operator/hook rather than the module (see `HiveOperator`/`HiveHook`), so > it can be imported but not used. Another is judicious use of `try`/`except > ImportError`. The simplest is to make people import things explicitly from > submodules. > h3. Operator dependencies > Right now, operators can't depend on each other if they aren't in the same > file. This is for the simple reason that there is no guarantee on what order > the operators will be loaded. It all comes down to which dictionary key gets > loaded first. One day Operator B could be loaded after Operator A; the next > day it might be loaded before. Consequently, A and B can't depend on each > other. Worse, if a user makes two operators that do depend on each other, > they won't get an error message when one fails to import. > For contrib modules in particular, this is sort of killer. > h3. Ease of use > It's *hard* to set up imports for a new operator. The dictionary-based import > instructions aren't obvious for new users, and errors are silently dismissed > which makes debugging difficult. > h3. Identity > Surprisingly, `airflow.operators.SubDagOperator != > airflow.operators.subdag_operator.SubDagOperator`. See #1168. > h2. Proposal > Use standard python importing for hooks/operators/etc. > - `__init__.py` files use straightforward, standard Python imports > - major operators are available at `airflow.operators.OperatorName` or > `airflow.operators.operator_module.OperatorName`. > - contrib operators are only available at > `airflow.contrib.operators.operator_module.OperatorName` in order