[jira] [Comment Edited] (AIRFLOW-111) DAG concurrency is not honored

2016-06-11 Thread Shenghu Yang (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319568#comment-15319568
 ] 

Shenghu Yang edited comment on AIRFLOW-111 at 6/11/16 8:41 PM:
---

I think the bug is here:
https://github.com/apache/incubator-airflow/blob/18009d03311a0b29e14811865e0b13b19427b5e4/airflow/models.py#L1233-L1247

Let's see one task instance is already been 'QUEUED', and next time when we hit 
the above code, it will NOT check the 'self.task.dag.concurrency_reached', 
hence it will run the task instance beyond dag's concurrency.

One fix would be right after the above code, we add the following:

if self.task.dag.concurrency_reached:
logging.warning('Job_id={} continues to wait, since dag concurrency limit 
[{}] is reached'.format(self.job_id, self.task.dag.concurrency))
return
else:
logging.info('Job_id={} starts to run, since dag concurrency limit [{}] is 
not reached'.format(self.job_id, self.task.dag.concurrency))



was (Author: shenghu):
I think the bug is here:
https://github.com/apache/incubator-airflow/blob/18009d03311a0b29e14811865e0b13b19427b5e4/airflow/models.py#L1233-L1247

Let's see one task is already been 'QUEUED', and next time when we hit the 
above code, it will NOT check the 'self.task.dag.concurrency_reached', hence it 
will run the task beyond dag's concurrency.

One fix would be right after the above code, we add the following:

if self.task.dag.concurrency_reached:
logging.warning('Job_id={} continues to wait, since dag concurrency limit 
[{}] is reached'.format(self.job_id, self.task.dag.concurrency))
return
else:
logging.info('Job_id={} starts to run, since dag concurrency limit [{}] is 
not reached'.format(self.job_id, self.task.dag.concurrency))


> DAG concurrency is not honored
> --
>
> Key: AIRFLOW-111
> URL: https://issues.apache.org/jira/browse/AIRFLOW-111
> Project: Apache Airflow
>  Issue Type: Sub-task
>  Components: celery, scheduler
>Affects Versions: Airflow 1.6.2, Airflow 1.7.1.2
> Environment: Version of Airflow: 1.6.2
> Airflow configuration: Running a Scheduler with LocalExecutor
> Operating System: 3.13.0-74-generic #118-Ubuntu SMP Thu Dec 17 22:52:10 UTC 
> 2015 x86_64 x86_64 x86_64 GNU/Linux
> Python Version: 2.7.6
> Screen shots of your DAG's status:
>Reporter: Shenghu Yang
> Fix For: Airflow 2.0
>
>
> Description of Issue
> In airflow.cfg, we set: max_active_runs_per_dag = 1
> In our dag, we set the dag_args['concurrency'] = 8, however, when the 
> scheduler starts to run, we can see this concurrency is not being honored, 
> airflow scheduler will run up to num of the 'parallelism' (we set as 25) task 
> instances for the ONE run dag_run.
> What did you expect to happen?
> dag_args['concurrency'] = 8 is honored, e.g. only run at most 8 task 
> instances concurrently.
> What happened instead?
> when the dag starts to run, we can see the concurrency is not being honored, 
> airflow scheduler/celery worker will run up to the 'parallelism' (we set as 
> 25) task instances.
> Here is how you can reproduce this issue on your machine:
> create a dag which contains nothing but 25 parallelized tasks.
> set the dag dag_args['concurrency'] = 8
> set the airflow parallelism = 25, and max_active_runs_per_dag = 1
> then run: airflow scheduler
> you will see all 25 task instance are scheduled to run, not 8.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (AIRFLOW-111) DAG concurrency is not honored

2016-06-11 Thread Shenghu Yang (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shenghu Yang updated AIRFLOW-111:
-
Description: 
Description of Issue

In airflow.cfg, we set: max_active_runs_per_dag = 1

In our dag, we set the dag_args['concurrency'] = 8, however, when the scheduler 
starts to run, we can see this concurrency is not being honored, airflow 
scheduler will run up to num of the 'parallelism' (we set as 25) task instances 
for the ONE run dag_run.

What did you expect to happen?
dag_args['concurrency'] = 8 is honored, e.g. only run at most 8 task instances 
concurrently.

What happened instead?
when the dag starts to run, we can see the concurrency is not being honored, 
airflow scheduler/celery worker will run up to the 'parallelism' (we set as 25) 
task instances.

Here is how you can reproduce this issue on your machine:

create a dag which contains nothing but 25 parallelized tasks.
set the dag dag_args['concurrency'] = 8
set the airflow parallelism = 25, and max_active_runs_per_dag = 1
then run: airflow scheduler
you will see all 25 task instance are scheduled to run, not 8.

  was:
Description of Issue

In our dag, we set the dag_args['concurrency'] = 8, however, when the scheduler 
starts to run, we can see this concurrency is not being honored, airflow 
scheduler will run up to num of the 'parallelism' (we set as 25) jobs.

What did you expect to happen?
dag_args['concurrency'] = 8 is honored, e.g. only run at most 8 jobs 
concurrently.

What happened instead?
when the dag starts to run, we can see the concurrency is not being honored, 
airflow scheduler/celery worker will run up to the 'parallelism' (we set as 25) 
jobs.

Here is how you can reproduce this issue on your machine:

create a dag which contains nothing but 25 parallelized jobs.
set the dag dag_args['concurrency'] = 8
set the airflow parallelism to 25
then run: airflow scheduler
you will see all 25 jobs are scheduled to run, not 8.


> DAG concurrency is not honored
> --
>
> Key: AIRFLOW-111
> URL: https://issues.apache.org/jira/browse/AIRFLOW-111
> Project: Apache Airflow
>  Issue Type: Sub-task
>  Components: celery, scheduler
>Affects Versions: Airflow 1.6.2, Airflow 1.7.1.2
> Environment: Version of Airflow: 1.6.2
> Airflow configuration: Running a Scheduler with LocalExecutor
> Operating System: 3.13.0-74-generic #118-Ubuntu SMP Thu Dec 17 22:52:10 UTC 
> 2015 x86_64 x86_64 x86_64 GNU/Linux
> Python Version: 2.7.6
> Screen shots of your DAG's status:
>Reporter: Shenghu Yang
> Fix For: Airflow 2.0
>
>
> Description of Issue
> In airflow.cfg, we set: max_active_runs_per_dag = 1
> In our dag, we set the dag_args['concurrency'] = 8, however, when the 
> scheduler starts to run, we can see this concurrency is not being honored, 
> airflow scheduler will run up to num of the 'parallelism' (we set as 25) task 
> instances for the ONE run dag_run.
> What did you expect to happen?
> dag_args['concurrency'] = 8 is honored, e.g. only run at most 8 task 
> instances concurrently.
> What happened instead?
> when the dag starts to run, we can see the concurrency is not being honored, 
> airflow scheduler/celery worker will run up to the 'parallelism' (we set as 
> 25) task instances.
> Here is how you can reproduce this issue on your machine:
> create a dag which contains nothing but 25 parallelized tasks.
> set the dag dag_args['concurrency'] = 8
> set the airflow parallelism = 25, and max_active_runs_per_dag = 1
> then run: airflow scheduler
> you will see all 25 task instance are scheduled to run, not 8.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (AIRFLOW-236) Support passing S3 credentials through environmental variables

2016-06-11 Thread Jakob Homan (JIRA)
Jakob Homan created AIRFLOW-236:
---

 Summary: Support passing S3 credentials through environmental 
variables
 Key: AIRFLOW-236
 URL: https://issues.apache.org/jira/browse/AIRFLOW-236
 Project: Apache Airflow
  Issue Type: Improvement
  Components: core
Reporter: Jakob Homan


Right now we expect S3 configs to be passed through one of a variety of config 
files, or through extra parameters in the connection screen.  It'd be nice to 
be able to pass these through env variables and note as such through the extra 
parameters.  This would lessen the need to include credentials in the webapp 
itself.

Alternatively, for logging (rather than as a connector), it might just be 
better for Airflow to use the profie defined as AWS_DEFAULT and avoid needed an 
explicit configuration at all.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-68) Align start_date with the schedule_interval

2016-06-11 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-68?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325832#comment-15325832
 ] 

ASF subversion and git services commented on AIRFLOW-68:


Commit f69eec3b44cbb0a0fb46a17baec195f7f3baf50e in incubator-airflow's branch 
refs/heads/master from [~bolke]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=f69eec3 ]

[AIRFLOW-68] Align start_date with the schedule_interval

This particular issue arises because of an alignment issue between
start_date and schedule_interval. This can only happen with cron-based
schedule_intervals that describe absolute points in time (like “1am”) as
opposed to time deltas (like “every hour”)

In the past (and in the docs) we have simply said that users must make
sure the two params agree. But this is counter intuitive. As in these
cases, start_date is sort of like telling the scheduler to
“start paying attention” as opposed to “this is my first execution date”.

This patch changes the behavior of the scheduler. The next run date of
the dag will be treated as "start_date + interval" unless the start_date
is on the (previous) interval in which case the start_date will be the
next run date.


> Align start_date with the schedule_interval
> ---
>
> Key: AIRFLOW-68
> URL: https://issues.apache.org/jira/browse/AIRFLOW-68
> Project: Apache Airflow
>  Issue Type: Sub-task
>  Components: scheduler
>Affects Versions: Airflow 1.7.1
>Reporter: Bolke de Bruin
> Fix For: Airflow 1.8
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[1/2] incubator-airflow git commit: [AIRFLOW-68] Align start_date with the schedule_interval

2016-06-11 Thread bolke
Repository: incubator-airflow
Updated Branches:
  refs/heads/master b7def7f1f -> 901e8f2a9


[AIRFLOW-68] Align start_date with the schedule_interval

This particular issue arises because of an alignment issue between
start_date and schedule_interval. This can only happen with cron-based
schedule_intervals that describe absolute points in time (like “1am”) as
opposed to time deltas (like “every hour”)

In the past (and in the docs) we have simply said that users must make
sure the two params agree. But this is counter intuitive. As in these
cases, start_date is sort of like telling the scheduler to
“start paying attention” as opposed to “this is my first execution 
date”.

This patch changes the behavior of the scheduler. The next run date of
the dag will be treated as "start_date + interval" unless the start_date
is on the (previous) interval in which case the start_date will be the
next run date.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f69eec3b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f69eec3b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f69eec3b

Branch: refs/heads/master
Commit: f69eec3b44cbb0a0fb46a17baec195f7f3baf50e
Parents: 03ce4b9
Author: Bolke de Bruin 
Authored: Tue Jun 7 09:52:05 2016 +0200
Committer: Bolke de Bruin 
Committed: Tue Jun 7 10:52:09 2016 +0200

--
 airflow/jobs.py   | 12 ++--
 airflow/models.py | 15 +++
 docs/faq.rst  | 15 ++-
 tests/jobs.py | 52 --
 4 files changed, 85 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f69eec3b/airflow/jobs.py
--
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 5aaab3b..3a2d97a 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -424,13 +424,21 @@ class SchedulerJob(BaseJob):
 # First run
 task_start_dates = [t.start_date for t in dag.tasks]
 if task_start_dates:
-next_run_date = min(task_start_dates)
+next_run_date = 
dag.normalize_schedule(min(task_start_dates))
+self.logger.debug("Next run date based on tasks {}"
+  .format(next_run_date))
 else:
 next_run_date = dag.following_schedule(last_scheduled_run)
 
 # don't ever schedule prior to the dag's start_date
 if dag.start_date:
-next_run_date = dag.start_date if not next_run_date else 
max(next_run_date, dag.start_date)
+next_run_date = (dag.start_date if not next_run_date
+ else max(next_run_date, dag.start_date))
+if next_run_date == dag.start_date:
+next_run_date = dag.normalize_schedule(dag.start_date)
+
+self.logger.debug("Dag start date: {}. Next run date: {}"
+  .format(dag.start_date, next_run_date))
 
 # this structure is necessary to avoid a TypeError from 
concatenating
 # NoneType

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f69eec3b/airflow/models.py
--
diff --git a/airflow/models.py b/airflow/models.py
index 08d0890..b6b7987 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2567,6 +2567,21 @@ class DAG(LoggingMixin):
 elif isinstance(self._schedule_interval, timedelta):
 return dttm - self._schedule_interval
 
+def normalize_schedule(self, dttm):
+"""
+Returns dttm + interval unless dttm is first interval then it returns 
dttm
+"""
+following = self.following_schedule(dttm)
+
+# in case of @once
+if not following:
+return dttm
+
+if self.previous_schedule(following) != dttm:
+return following
+
+return dttm
+
 @property
 def tasks(self):
 return list(self.task_dict.values())

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f69eec3b/docs/faq.rst
--
diff --git a/docs/faq.rst b/docs/faq.rst
index 21623fc..e61c1bf 100644
--- a/docs/faq.rst
+++ b/docs/faq.rst
@@ -78,12 +78,17 @@ We recommend against using dynamic values as 
``start_date``, especially
 once the period closes, and in theory an ``@hourly`` DAG would never get to
 an hour after now as ``now()`` moves along.
 
-We also recommend using rounded ``start_date`` in relation to your
-``schedule_interval``. This means an 

[2/2] incubator-airflow git commit: Merge branch 'align_startdate'

2016-06-11 Thread bolke
Merge branch 'align_startdate'


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/901e8f2a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/901e8f2a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/901e8f2a

Branch: refs/heads/master
Commit: 901e8f2a95cda2bf90b3776df2e2565cfca7925e
Parents: b7def7f f69eec3
Author: Bolke de Bruin 
Authored: Sat Jun 11 13:47:27 2016 +0200
Committer: Bolke de Bruin 
Committed: Sat Jun 11 13:47:27 2016 +0200

--
 airflow/jobs.py   | 12 ++--
 airflow/models.py | 15 +++
 docs/faq.rst  | 15 ++-
 tests/jobs.py | 52 --
 4 files changed, 85 insertions(+), 9 deletions(-)
--




[jira] [Commented] (AIRFLOW-31) Use standard imports for hooks/operators

2016-06-11 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-31?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325763#comment-15325763
 ] 

Chris Riccomini commented on AIRFLOW-31:


Please have a look at AIRFLOW-200, and 
[this|https://github.com/apache/incubator-airflow/pull/1586] PR. While it does 
not  use standard imports, it does improve the experience for the current 
import style, so that people can see why hooks/operators can't import, and 
makes loads lazy.

> Use standard imports for hooks/operators
> 
>
> Key: AIRFLOW-31
> URL: https://issues.apache.org/jira/browse/AIRFLOW-31
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: operators
>Reporter: Jeremiah Lowin
>Assignee: Jeremiah Lowin
>  Labels: enhancement
> Fix For: Airflow 2.0
>
>
> (Migrated from https://github.com/airbnb/airflow/issues/1238)
> Currently, Airflow uses a relatively complex import mechanism to import hooks 
> and operators without polluting the namespace with submodules. I would like 
> to propose that Airflow abandon that system and use standard Python importing.
> Here are a few major reasons why I think the current system has run its 
> course.
> h3. Polluting namespace
> The biggest advantage of the current system, as I understand it, is that only 
> Operators appear in the `airflow.operators` namespace.  The submodules that 
> actually contain the operators do not.
> So for example while `airflow.operators.python_operator.PythonOperator` is a 
> thing, `PythonOperator` is in the `airflow.operators` namespace but 
> `python_operator` is not.
> I think this sort of namespace pollution was helpful when Airflow was a 
> smaller project, but as the number of hooks/operators grows -- and especially 
> as the `contrib` hooks/operators grow -- I'd argue that namespacing is a 
> *good thing*. It provides structure and organization, and opportunities for 
> documentation (through module docstrings).
> In fact, I'd argue that the current namespace is itself getting quite 
> polluted -- the only way to know what's available is to use something like 
> Ipython tab-completion to browse an alphabetical list of Operator names, or 
> to load the source file and grok the import definition (which no one 
> installing from pypi is likely to do).
> h3. Conditional imports
> There's a second advantage to the current system that any module that fails 
> to import is silently ignored. It makes it easy to have optional 
> dependencies. For example, if someone doesn't have `boto` installed, then 
> they don't have an `S3Hook` either. Same for a HiveOperator
> Again, as Airflow grows and matures, I think this is a little too magic. If 
> my environment is missing a dependency, I want to hear about it.
> On the other hand, the `contrib` namespace sort of depends on this -- we 
> don't want users to have to install every single dependency. So I propose 
> that contrib modules all live in their submodules: `from 
> airflow.contrib.operators.my_operator import MyOperator`. As mentioned 
> previously, having structure and namespacing is a good thing as the project 
> gets more complex.
> Other ways to handle this include putting "non-standard" dependencies inside 
> the operator/hook rather than the module (see `HiveOperator`/`HiveHook`), so 
> it can be imported but not used. Another is judicious use of `try`/`except 
> ImportError`. The simplest is to make people import things explicitly from 
> submodules.
> h3. Operator dependencies
> Right now, operators can't depend on each other if they aren't in the same 
> file. This is for the simple reason that there is no guarantee on what order 
> the operators will be loaded. It all comes down to which dictionary key gets 
> loaded first. One day Operator B could be loaded after Operator A; the next 
> day it might be loaded before. Consequently, A and B can't depend on each 
> other. Worse, if a user makes two operators that do depend on each other, 
> they won't get an error message when one fails to import.
> For contrib modules in particular, this is sort of killer.
> h3. Ease of use
> It's *hard* to set up imports for a new operator. The dictionary-based import 
> instructions aren't obvious for new users, and errors are silently dismissed 
> which makes debugging difficult.
> h3. Identity
> Surprisingly, `airflow.operators.SubDagOperator != 
> airflow.operators.subdag_operator.SubDagOperator`. See #1168.
> h2. Proposal
> Use standard python importing for hooks/operators/etc.
> - `__init__.py` files use straightforward, standard Python imports
> - major operators are available at `airflow.operators.OperatorName` or 
> `airflow.operators.operator_module.OperatorName`.
> - contrib operators are only available at 
> `airflow.contrib.operators.operator_module.OperatorName` in order