Re: Disable Processing of DAG file

2018-05-28 Thread Ananth Durai
It is an interesting question. On a slightly related note, Correct me if
I'm wrong, AFAIK we require restarting airflow scheduler in order pick any
new DAG file changes by the scheduler. In that case, should the scheduler
do the DAGFileProcessing every time before scheduling the tasks?

Regards,
Ananth.P,






On 28 May 2018 at 21:46, ramandu...@gmail.com  wrote:

> Hi All,
>
> We have a use case where there would be 100(s) of DAG files with schedule
> set to "@once". Currently it seems that scheduler processes each and every
> file and creates a Dag Object.
> Is there a way or config to tell scheduler to stop processing certain
> files.
>
> Thanks,
> Raman Gupta
>


Disable Processing of DAG file

2018-05-28 Thread ramandumcs
Hi All,

We have a use case where there would be 100(s) of DAG files with schedule set 
to "@once". Currently it seems that scheduler processes each and every file and 
creates a Dag Object.
Is there a way or config to tell scheduler to stop processing certain files.

Thanks,
Raman Gupta


Re: HttpSensor raising exception with status=403

2018-05-28 Thread Pedro Machado
Thanks! I ended up creating a plugin and it's working OK.


On Mon, May 28, 2018 at 9:22 AM Driesprong, Fokko 
wrote:

> Hi Pedro,
>
> You could just create a CustomHttpHook and place it on your pythonpath,
> then you should also create a CustomHttpSensor. Hope this helps.
>
> Cheers, Fokko
>
> 2018-05-26 2:48 GMT+02:00 Pedro Machado :
>
> > Hi,
> >
> > I am using HttpSensor to look for a file. The webserver is returning 403
> > (instead of 404) while the file is not available. This is causing the
> > sensor to raise an exception.
> >
> > I see that a recent commit added the ability to disable the call to
> > response.raise_for_status() on the http hook by passing
> > extra_options={'check_response': False} to the sensor.
> >
> > https://github.com/apache/incubator-airflow/commit/
> > 6c19468e0b3b938249acc43e4b833a753d093efc?diff=unified
> >
> > I am unable to upgrade airflow. What would be the best way to incorporate
> > the new code, perhaps into a custom sensor?
> >
> > Thanks,
> >
> > Pedro
> >
>


Re: How to wait for external process

2018-05-28 Thread Stefan Seelmann
Thanks Christopher for the idea. That would work, we already have such a
"listener" that polls a queue (SQS) and creates the DAG runs. However it
would have been nice to have the full process in one DAG to have a
better overview about running jobs and leverage the gantt chart, but I
think this can be accomplished via custom plugins and views.

On 05/28/2018 08:43 PM, Christopher Bockman wrote:
> Haven't done this, but we'll have a similar need in the future, so have
> investigated a little.
> 
> What about a design pattern something like this:
> 
> 1) When jobs are done (ready for further processing) they publish those
> details to a queue (such as GC Pub/Sub or any other sort of queue)
> 
> 2) A single "listener" DAG sits and periodically checks that queue.  If it
> finds anything on it, it triggers (via DAG trigger) all of the DAGs which
> are on the queue.*
> 
> * = if your triggering volume is too high, this may cause airflow issues w/
> too many going at once; this could presumably be solved then via custom
> rate-limiting on firing these
> 
> 3) The listener DAG resets itself (triggers itself)
> 
> 
> On Mon, May 28, 2018 at 7:17 AM, Driesprong, Fokko 
> wrote:
> 
>> Hi Stefan,
>>
>> Afaik there isn't a more efficient way of doing this. DAGs that are relying
>> on a lot of sensors are experiencing the same issues. The only way right
>> now, I can think of, is doing updating the state directly in the database.
>> But then you need to know what you are doing. I can image that this would
>> be feasible by using an AWS lambda function. Hope this helps.
>>
>> Cheers, Fokko
>>
>> 2018-05-26 17:50 GMT+02:00 Stefan Seelmann :
>>
>>> Hello,
>>>
>>> I have a DAG (externally triggered) where some processing is done at an
>>> external system (EC2 instance). The processing is started by an Airflow
>>> task (via HTTP request). The DAG should only continue once that
>>> processing is completed. In a first naive implementation I created a
>>> sensor that gets the progress (via HTTP request) and only if status is
>>> "finished" returns true and the DAG run continues. That works but...
>>>
>>> ... the external processing can take hours or days, and during that time
>>> a worker is occupied which does nothing but HTTP GET and sleep. There
>>> will be hundreds of DAG runs in parallel which means hundreds of workers
>>> are occupied.
>>>
>>> I looked into other operators that do computation on external systems
>>> (ECSOperator, AWSBatchOperator) but they also follow that pattern and
>>> just wait/sleep.
>>>
>>> So I want to ask if there is a more efficient way to build such a
>>> workflow with Airflow?
>>>
>>> Kind Regards,
>>> Stefan
>>>
>>
> 



Re: Alert Emails Templatizing

2018-05-28 Thread Ananth Durai
It is a bit tricky;
*Step 1:*
you can write an SLA miss callback, send email from the callback and empty
the `slas` object so that airflow won't sent SLA miss email.
https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L688

*Step 2:*
You can reuse airflow `send_email` method here
https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L61

*Step 3:*
If you are sending sla_miss from your callback, you need to mutate the
`sla_miss` table just like
https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L726

I hope this will get simplified in the future releases.

Regards,
Ananth.P,






On 28 May 2018 at 05:48, vardangupta...@gmail.com 
wrote:

> Hi team,
>
> We had a use case where we wanted to serve different email body to
> different use cases at the time of failure & up_for_retry, currently body
> seems to be hard coded in models.py, Is there any plan to make it
> templatized in upcoming future or it will be a good idea if we come across
> with code change and contribute? Please suggest recommended way of
> implementing the feature.
>
>
> Regards,
> Vardan Gupta
>


Re: How to wait for external process

2018-05-28 Thread Ananth Durai
Since you already on AWS, the simplest thing I could think of is to write a
signal file once the job finished and the downstream job waiting for the
signal file. In other words, the same pattern how the Hadoop jobs writing
`_SUCCESS` file and the downstream jobs depends on the signal file.

Regards,
Ananth.P,






On 28 May 2018 at 13:06, Stefan Seelmann  wrote:

> Thanks Christopher for the idea. That would work, we already have such a
> "listener" that polls a queue (SQS) and creates the DAG runs. However it
> would have been nice to have the full process in one DAG to have a
> better overview about running jobs and leverage the gantt chart, but I
> think this can be accomplished via custom plugins and views.
>
> On 05/28/2018 08:43 PM, Christopher Bockman wrote:
> > Haven't done this, but we'll have a similar need in the future, so have
> > investigated a little.
> >
> > What about a design pattern something like this:
> >
> > 1) When jobs are done (ready for further processing) they publish those
> > details to a queue (such as GC Pub/Sub or any other sort of queue)
> >
> > 2) A single "listener" DAG sits and periodically checks that queue.  If
> it
> > finds anything on it, it triggers (via DAG trigger) all of the DAGs which
> > are on the queue.*
> >
> > * = if your triggering volume is too high, this may cause airflow issues
> w/
> > too many going at once; this could presumably be solved then via custom
> > rate-limiting on firing these
> >
> > 3) The listener DAG resets itself (triggers itself)
> >
> >
> > On Mon, May 28, 2018 at 7:17 AM, Driesprong, Fokko  >
> > wrote:
> >
> >> Hi Stefan,
> >>
> >> Afaik there isn't a more efficient way of doing this. DAGs that are
> relying
> >> on a lot of sensors are experiencing the same issues. The only way right
> >> now, I can think of, is doing updating the state directly in the
> database.
> >> But then you need to know what you are doing. I can image that this
> would
> >> be feasible by using an AWS lambda function. Hope this helps.
> >>
> >> Cheers, Fokko
> >>
> >> 2018-05-26 17:50 GMT+02:00 Stefan Seelmann :
> >>
> >>> Hello,
> >>>
> >>> I have a DAG (externally triggered) where some processing is done at an
> >>> external system (EC2 instance). The processing is started by an Airflow
> >>> task (via HTTP request). The DAG should only continue once that
> >>> processing is completed. In a first naive implementation I created a
> >>> sensor that gets the progress (via HTTP request) and only if status is
> >>> "finished" returns true and the DAG run continues. That works but...
> >>>
> >>> ... the external processing can take hours or days, and during that
> time
> >>> a worker is occupied which does nothing but HTTP GET and sleep. There
> >>> will be hundreds of DAG runs in parallel which means hundreds of
> workers
> >>> are occupied.
> >>>
> >>> I looked into other operators that do computation on external systems
> >>> (ECSOperator, AWSBatchOperator) but they also follow that pattern and
> >>> just wait/sleep.
> >>>
> >>> So I want to ask if there is a more efficient way to build such a
> >>> workflow with Airflow?
> >>>
> >>> Kind Regards,
> >>> Stefan
> >>>
> >>
> >
>
>


Ability to discover custom plugins, operators, sensors, etc. from various locations

2018-05-28 Thread Ritesh Shrivastav
Hi,

I was trying to find if its possible to create custom plugins without
dropping them into `$AIRFLOW_HOME/plugins` (or the directory defined in
`airflow.cfg`).
We can define one location in `airflow.cfg` but I have multiple projects
which will have their own workflows so, ideally I would want to implement
custom plugins (along with operators, sensors, etc.) in those repositories
itself and let them get discovered from there.
Checking through documentation and browsing code I concluded that the only
way to get this done is by ensuring that `$PYTHONPATH` is set to include
directories where these custom modules are stored.

So, I want to ask if there is a better way to handle this?
If not, can `plugins_manager` (AirflowPlugin) be improved to suit this
use-case?


Kind Regards,
Ritesh


Re: How to wait for external process

2018-05-28 Thread Christopher Bockman
Haven't done this, but we'll have a similar need in the future, so have
investigated a little.

What about a design pattern something like this:

1) When jobs are done (ready for further processing) they publish those
details to a queue (such as GC Pub/Sub or any other sort of queue)

2) A single "listener" DAG sits and periodically checks that queue.  If it
finds anything on it, it triggers (via DAG trigger) all of the DAGs which
are on the queue.*

* = if your triggering volume is too high, this may cause airflow issues w/
too many going at once; this could presumably be solved then via custom
rate-limiting on firing these

3) The listener DAG resets itself (triggers itself)


On Mon, May 28, 2018 at 7:17 AM, Driesprong, Fokko 
wrote:

> Hi Stefan,
>
> Afaik there isn't a more efficient way of doing this. DAGs that are relying
> on a lot of sensors are experiencing the same issues. The only way right
> now, I can think of, is doing updating the state directly in the database.
> But then you need to know what you are doing. I can image that this would
> be feasible by using an AWS lambda function. Hope this helps.
>
> Cheers, Fokko
>
> 2018-05-26 17:50 GMT+02:00 Stefan Seelmann :
>
> > Hello,
> >
> > I have a DAG (externally triggered) where some processing is done at an
> > external system (EC2 instance). The processing is started by an Airflow
> > task (via HTTP request). The DAG should only continue once that
> > processing is completed. In a first naive implementation I created a
> > sensor that gets the progress (via HTTP request) and only if status is
> > "finished" returns true and the DAG run continues. That works but...
> >
> > ... the external processing can take hours or days, and during that time
> > a worker is occupied which does nothing but HTTP GET and sleep. There
> > will be hundreds of DAG runs in parallel which means hundreds of workers
> > are occupied.
> >
> > I looked into other operators that do computation on external systems
> > (ECSOperator, AWSBatchOperator) but they also follow that pattern and
> > just wait/sleep.
> >
> > So I want to ask if there is a more efficient way to build such a
> > workflow with Airflow?
> >
> > Kind Regards,
> > Stefan
> >
>


Alert Emails Templatizing

2018-05-28 Thread vardanguptacse
Hi team,

We had a use case where we wanted to serve different email body to different 
use cases at the time of failure & up_for_retry, currently body seems to be 
hard coded in models.py, Is there any plan to make it templatized in upcoming 
future or it will be a good idea if we come across with code change and 
contribute? Please suggest recommended way of implementing the feature.


Regards,
Vardan Gupta


Re: HttpSensor raising exception with status=403

2018-05-28 Thread Driesprong, Fokko
Hi Pedro,

You could just create a CustomHttpHook and place it on your pythonpath,
then you should also create a CustomHttpSensor. Hope this helps.

Cheers, Fokko

2018-05-26 2:48 GMT+02:00 Pedro Machado :

> Hi,
>
> I am using HttpSensor to look for a file. The webserver is returning 403
> (instead of 404) while the file is not available. This is causing the
> sensor to raise an exception.
>
> I see that a recent commit added the ability to disable the call to
> response.raise_for_status() on the http hook by passing
> extra_options={'check_response': False} to the sensor.
>
> https://github.com/apache/incubator-airflow/commit/
> 6c19468e0b3b938249acc43e4b833a753d093efc?diff=unified
>
> I am unable to upgrade airflow. What would be the best way to incorporate
> the new code, perhaps into a custom sensor?
>
> Thanks,
>
> Pedro
>


Re: How to wait for external process

2018-05-28 Thread Driesprong, Fokko
Hi Stefan,

Afaik there isn't a more efficient way of doing this. DAGs that are relying
on a lot of sensors are experiencing the same issues. The only way right
now, I can think of, is doing updating the state directly in the database.
But then you need to know what you are doing. I can image that this would
be feasible by using an AWS lambda function. Hope this helps.

Cheers, Fokko

2018-05-26 17:50 GMT+02:00 Stefan Seelmann :

> Hello,
>
> I have a DAG (externally triggered) where some processing is done at an
> external system (EC2 instance). The processing is started by an Airflow
> task (via HTTP request). The DAG should only continue once that
> processing is completed. In a first naive implementation I created a
> sensor that gets the progress (via HTTP request) and only if status is
> "finished" returns true and the DAG run continues. That works but...
>
> ... the external processing can take hours or days, and during that time
> a worker is occupied which does nothing but HTTP GET and sleep. There
> will be hundreds of DAG runs in parallel which means hundreds of workers
> are occupied.
>
> I looked into other operators that do computation on external systems
> (ECSOperator, AWSBatchOperator) but they also follow that pattern and
> just wait/sleep.
>
> So I want to ask if there is a more efficient way to build such a
> workflow with Airflow?
>
> Kind Regards,
> Stefan
>


Re: Using Airflow with dataset dependant flows (not date)

2018-05-28 Thread Daniel (Daniel Lamblin) [BDP - Seoul]
This seemed like a very clear explanation of the JIRA ticket and the idea of 
making dagruns depend not on a schedule but the arrival of a dataset.
I think a lot would have to change if the execution date was changed to a 
parameterized value, and that's not the only thing that would have to change to 
support a dataset trigger.

Thinking about the video encoding example, it seem the airflow way to kind of 
do that would be to have dataset dags be dependent on a dag that is frequently 
scheduled to run just a TriggerDagOperator which contains a python callable 
polling for the new datasets (or subscribing to a queue of updates about them) 
which then decides which DAG ID to trigger for the particular dataset, and what 
dag_run_obj.payload should be to inform it of the right dataset to run on.
You might want to write a plugin that give a different kind of tree view for 
these types of DAGs that get triggered this way so that you can easily see the 
dataset and payload specifics in the overview of the runs.

There's an example of triggering a dag with an assigned payload:
https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_trigger_controller_dag.py
And an example of the triggered dag using the payload:
https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_trigger_target_dag.py
 

The latter part works the same way as when a cli triggered dag accepts a conf 
object.

The experimental API also contains a way of triggering with a conf object:
https://github.com/apache/incubator-airflow/blob/master/airflow/www/api/experimental/endpoints.py#L42
So if you wanted to skip the high-frequency trigger controller dag, and used a 
kind of queue, like an SQS queue to which you could subscribe a https trigger 
or something, then the queue system could trigger a target dag through the API.

Does this help you in more concretely using Airflow for your needs or are you 
looking to fill in a feature for some part of the roadmap that doesn't yet 
exist?
-Daniel

On 5/18/18, 4:52 PM, "Javier Domingo Cansino"  wrote:

Hello Guys,

First of all, I have submitted the idea to JIRA[1], and after speaking with
the guys at gitter,
they told me to bring the discussion here too.

Right now Airflow only understands of being a date based scheduler. It is
extremely complete on
that sense, and makes it really easy to populate and backfill your DAGs.
Monitoring is quite
decent, and can be improved through plugins. Everything is code, as opposed
to most of the
alternatives out there[2][3][4], and you may or not depend on files
existing to go to the next
step. There is an UI that lets you visualize the status of your systems and
trigger manually
jobs.

There is a limitation however on running on dates only, and is that
sometimes there are DAGs
that will not depend on the date, but on the dataset. Some examples I am
close to:

  * Bioinf pipeline, where you process samples

  * Media pipeline, where you may process different videos/audios in the
same way

Right now I am using Snakemake for the first ones, and bash scripts for the
second one, however
I have thought that maybe Airflow could be a solution to these two problems.

I have been reading the code, and although the term execution_date is quite
coupled, it seems
like it could be doable to abstract the datatype of this parametrization
variable (datetime) and
extend it to be something that could depend on something else (string).

After all, for what I have seen execution_date is just the parametrization
variable.

Questions I would like to ask:

  * Is this some need you have had? If so, how did you solve it? Is there
any other tool with the
features I described that could help me on that?

  * How do you recommend solving this with Airflow?

* In gitter people has proposed forgetting about execution_dates, just
triggering the DAGs
  and parametrizing the run through variables. However this has the
drawback to lose execution
  tracking, and make impossible to run several DAGs at the same time
for different datasets

* There was also the proposal to instantiate subDAGs per dataset, and
have one DAG where the
  first step is to read what are the samples to run on. The problem I
see with this is that
  you lose tracking on which samples have been run, and you cannot have
per sample historic
  data.

* Airflow works good when you have datasets that change, therefore,
other solution would be
  to instantiate one DAG per sample, and then have a single execution.
However this sounds a
  bit overkill to me, because you would just have one DAGRun per DAG.

  * If this is something that would