Re: Submitting 1000+ tasks to airflow programatically

2018-03-21 Thread Chris Fei
@Kyle, I do something similar and have run into the problems you've
mentioned. In my case, I access data from S3 and then generate separate
DAGs (of different structures) based on the data that's pulled. I've
also found that the UI for accessing a single large DAG is slow so I
prefer to keep many separate DAGs. What I'd try is to define a DAG
that's responsible for accessing your API and caching the client IDs
somewhere locally, maybe just to a file on disk or as an Airflow
Variable. You can run this DAG on whatever schedule is appropriate for
you. From there, build a function that creates a DAG and then for each
client ID, register a DAG built by that function to the global context.
Like this:
def create_client_dag(client_id):
# build dag here

def get_client_ids_locally():
# access the data that was pulled from the API

client_ids = get_client_ids_locally()
for client in client_ids:
dag = create_client_dag(client)
globals()[dag.dag_id] = dag

This approach also handles removing client IDs somewhat gracefully. DAGs
for removed clients will still appear in the UI (you can build a
maintenance DAG to clean that up), but they'll be disabled and their
tasks won't be scheduled.
On Wed, Mar 21, 2018, at 7:32 PM, Kyle Hamlin wrote:
> Thanks for all the responses let me try to address the main themes.
> 
> @Ace @Nicholas @Taylor
> I originally started with a loop over my list of client ids and
> created a> SparkSubmitOperator for each client. The pseudo code would look
> something> like this:
> 
> dag = DAG(...)
> 
> client_ids = get_client_ids()
> for client_id in client_ids:
> SparkSubmitOperator(
> ...
> dag=dag
> )
> 
> I found that this approach kind of clunky for a few reasons.
> First, the> get_cleint_ids() function was hitting our API every time the dag
> was read> by the scheduler which seemed excessive (every 30 seconds or
> so?). Second,> it seemed like when a single task failure made marked the whole
> dag as a> failure, but I guess retrying till the task worked could solve
> this? Third,> the UI gets really clunky and slow, basically unusable when it
> tries to> render the graph view for that many tasks. Finally, Airflow
> doesn't seem> very happy when client_ids are removed i.e. the get_client_ids()
> no longer> returns a specific client_id, it really seems to want a static dag.
> 
> Do I really have to poll and API or database every 30 seconds for this> 
> dynamic client_id data?
> 
> @Ace
> I have been limiting concurrency so as to not blast the cluster
> 
> @Nicholas
> Thank you for the noise suggestion I will definitely implement
> that if I> continue with the same methodology
> 
> @Taylor
> Are you using a SubDagOperator? Or is your process similar to the
> pseudo code I wrote above?
> 
> 
> On Wed, Mar 21, 2018 at 2:49 PM Taylor Edmiston
>  wrote:> 
>> We also use a similar approach to generate dynamic DAGs based on
>> a common>> template DAG file.  We pull in the list of config objects, one
>> per DAG,>> from an internal API lightly wrapping the database, then we
>> cache that>> response in a Airflow Variable that gets updated once a minute. 
>>  The>> dynamic DAGs are generated from that variable.
>> 
>> *Taylor Edmiston*
>> TEdmiston.com  | Blog
>> 
>> Stack Overflow CV  | LinkedIn>> 
>>  | AngelList
>> 
>> 
>> 
>> On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak <
>> nicolas.ki...@weightwatchers.com> wrote:
>> 
>>> Kyle,
>>> 
>>> We have a similar approach but on a much, much smaller scale. We
>>> now have>>> <100 “things to process” but expect it to grow to under ~200.  
>>> Each>> “thing
>>> to process” has the same workflow so we have a single DAG
>>> definition that>>> does about 20 tasks per, then we loop over the list of 
>>> items and
>>> produce>> a
>>> dag object for each one adding it to the global definition.
>>> 
>>> One of the things we quickly ran into was crushing the scheduler as>>> 
>>> everything was running with the same start time.  To get around
>>> this we>> add
>>> noise to the start time minute and seconds. Simply index % 60.  This>>> 
>>> spreads out the load so that the scheduler isn’t trying to run
>>> everything>>> at the exact same moment.  I would suggest if you do go this
>>> route, to>> also
>>> stagger your hours if you can because of how many you plan to run.
>> Perhaps
>>> your DAGs are smaller and aren’t as CPU intensive as ours.
>>> 
>>> On 3/21/18, 1:35 PM, "Kyle Hamlin"  wrote:
>>> 
>>>   Hello,
>>> 
>>>   I'm currently using Airflow for some ETL tasks where I submit a
>>>   spark>>> job
>>>   to a cluster and poll till it is complete. This workflow is nice
>>> because it
>>>   is typically a single Dag. I'm now starting to do more machine
>> learning
>>>   tasks and need to build a model per client which is 1000+

Re: Submitting 1000+ tasks to airflow programatically

2018-03-21 Thread Kyle Hamlin
Thanks for all the responses let me try to address the main themes.

@Ace @Nicholas @Taylor
I originally started with a loop over my list of client ids and created a
SparkSubmitOperator for each client. The pseudo code would look something
like this:

dag = DAG(...)

client_ids = get_client_ids()
for client_id in client_ids:
SparkSubmitOperator(
...
dag=dag
)

I found that this approach kind of clunky for a few reasons. First, the
get_cleint_ids() function was hitting our API every time the dag was read
by the scheduler which seemed excessive (every 30 seconds or so?). Second,
it seemed like when a single task failure made marked the whole dag as a
failure, but I guess retrying till the task worked could solve this? Third,
the UI gets really clunky and slow, basically unusable when it tries to
render the graph view for that many tasks. Finally, Airflow doesn't seem
very happy when client_ids are removed i.e. the get_client_ids() no longer
returns a specific client_id, it really seems to want a static dag.

Do I really have to poll and API or database every 30 seconds for this
dynamic client_id data?

@Ace
I have been limiting concurrency so as to not blast the cluster

@Nicholas
Thank you for the noise suggestion I will definitely implement that if I
continue with the same methodology

@Taylor
Are you using a SubDagOperator? Or is your process similar to the
pseudo code I wrote above?


On Wed, Mar 21, 2018 at 2:49 PM Taylor Edmiston  wrote:

> We also use a similar approach to generate dynamic DAGs based on a common
> template DAG file.  We pull in the list of config objects, one per DAG,
> from an internal API lightly wrapping the database, then we cache that
> response in a Airflow Variable that gets updated once a minute.  The
> dynamic DAGs are generated from that variable.
>
> *Taylor Edmiston*
> TEdmiston.com  | Blog
> 
> Stack Overflow CV  | LinkedIn
>  | AngelList
> 
>
>
> On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak <
> nicolas.ki...@weightwatchers.com> wrote:
>
> > Kyle,
> >
> > We have a similar approach but on a much, much smaller scale. We now have
> > <100 “things to process” but expect it to grow to under ~200.  Each
> “thing
> > to process” has the same workflow so we have a single DAG definition that
> > does about 20 tasks per, then we loop over the list of items and produce
> a
> > dag object for each one adding it to the global definition.
> >
> > One of the things we quickly ran into was crushing the scheduler as
> > everything was running with the same start time.  To get around this we
> add
> > noise to the start time minute and seconds. Simply index % 60.  This
> > spreads out the load so that the scheduler isn’t trying to run everything
> > at the exact same moment.  I would suggest if you do go this route, to
> also
> > stagger your hours if you can because of how many you plan to run.
> Perhaps
> > your DAGs are smaller and aren’t as CPU intensive as ours.
> >
> > On 3/21/18, 1:35 PM, "Kyle Hamlin"  wrote:
> >
> > Hello,
> >
> > I'm currently using Airflow for some ETL tasks where I submit a spark
> > job
> > to a cluster and poll till it is complete. This workflow is nice
> > because it
> > is typically a single Dag. I'm now starting to do more machine
> learning
> > tasks and need to build a model per client which is 1000+ clients. My
> > spark cluster is capable of handling this workload, however, it
> doesn't
> > seem scalable to write 1000+ dags to fit models for each client. I
> want
> > each client to have its own task instance so it can be retried if it
> > fails without having to run all 1000+ tasks over again. How do I
> handle
> > this type of workflow in Airflow?
> >
> >
> >
>


Use existing EMR cluster if it exists

2018-03-21 Thread Sam Sen
It takes roughly ten minutes to spin up a cluster of one worker and a
master. Each time we want to run code through EMR our developers have to
wait before they can submit steps. The idea is to leave standby clusters
but they would terminate if they're idle more than X minutes/hours (we
haven't decide yet).

Is there existing open source code that I can use or reference? I figure
the emrcreatejobflow is what I would need to extend.


Re: Submitting 1000+ tasks to airflow programatically

2018-03-21 Thread Taylor Edmiston
We also use a similar approach to generate dynamic DAGs based on a common
template DAG file.  We pull in the list of config objects, one per DAG,
from an internal API lightly wrapping the database, then we cache that
response in a Airflow Variable that gets updated once a minute.  The
dynamic DAGs are generated from that variable.

*Taylor Edmiston*
TEdmiston.com  | Blog

Stack Overflow CV  | LinkedIn
 | AngelList



On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak <
nicolas.ki...@weightwatchers.com> wrote:

> Kyle,
>
> We have a similar approach but on a much, much smaller scale. We now have
> <100 “things to process” but expect it to grow to under ~200.  Each “thing
> to process” has the same workflow so we have a single DAG definition that
> does about 20 tasks per, then we loop over the list of items and produce a
> dag object for each one adding it to the global definition.
>
> One of the things we quickly ran into was crushing the scheduler as
> everything was running with the same start time.  To get around this we add
> noise to the start time minute and seconds. Simply index % 60.  This
> spreads out the load so that the scheduler isn’t trying to run everything
> at the exact same moment.  I would suggest if you do go this route, to also
> stagger your hours if you can because of how many you plan to run.  Perhaps
> your DAGs are smaller and aren’t as CPU intensive as ours.
>
> On 3/21/18, 1:35 PM, "Kyle Hamlin"  wrote:
>
> Hello,
>
> I'm currently using Airflow for some ETL tasks where I submit a spark
> job
> to a cluster and poll till it is complete. This workflow is nice
> because it
> is typically a single Dag. I'm now starting to do more machine learning
> tasks and need to build a model per client which is 1000+ clients. My
> spark cluster is capable of handling this workload, however, it doesn't
> seem scalable to write 1000+ dags to fit models for each client. I want
> each client to have its own task instance so it can be retried if it
> fails without having to run all 1000+ tasks over again. How do I handle
> this type of workflow in Airflow?
>
>
>


Re: Submitting 1000+ tasks to airflow programatically

2018-03-21 Thread Nicolas Kijak
Kyle,

We have a similar approach but on a much, much smaller scale. We now have <100 
“things to process” but expect it to grow to under ~200.  Each “thing to 
process” has the same workflow so we have a single DAG definition that does 
about 20 tasks per, then we loop over the list of items and produce a dag 
object for each one adding it to the global definition.

One of the things we quickly ran into was crushing the scheduler as everything 
was running with the same start time.  To get around this we add noise to the 
start time minute and seconds. Simply index % 60.  This spreads out the load so 
that the scheduler isn’t trying to run everything at the exact same moment.  I 
would suggest if you do go this route, to also stagger your hours if you can 
because of how many you plan to run.  Perhaps your DAGs are smaller and aren’t 
as CPU intensive as ours.

On 3/21/18, 1:35 PM, "Kyle Hamlin"  wrote:

Hello,

I'm currently using Airflow for some ETL tasks where I submit a spark job
to a cluster and poll till it is complete. This workflow is nice because it
is typically a single Dag. I'm now starting to do more machine learning
tasks and need to build a model per client which is 1000+ clients. My
spark cluster is capable of handling this workload, however, it doesn't
seem scalable to write 1000+ dags to fit models for each client. I want
each client to have its own task instance so it can be retried if it
fails without having to run all 1000+ tasks over again. How do I handle
this type of workflow in Airflow?




Re: Submitting 1000+ tasks to airflow programatically

2018-03-21 Thread Ace Haidrey
Hi,
This is a neat use case - glad you’re using Airflow for it!
Out of curiosity, why don’t you create a single dag, call it 
company_models_dag, and register each task to this dag. So for each company you 
have a spark task to build a model. You can do this programmatically, to loop 
through each company (you probably have a list of that). This way if one 
company model fails you can just rerun that, you don’t have to rerun entire dag 
- you pick up from where it left off as in the failed task and it’s downstream.
Some things to consider are you’ll want to limit the concurrency (unless you 
can indeed run 1000 tasks in your spark cluster at once). 

Sent from my iPhone

> On Mar 21, 2018, at 10:34 AM, Kyle Hamlin  wrote:
> 
> Hello,
> 
> I'm currently using Airflow for some ETL tasks where I submit a spark job
> to a cluster and poll till it is complete. This workflow is nice because it
> is typically a single Dag. I'm now starting to do more machine learning
> tasks and need to build a model per client which is 1000+ clients. My
> spark cluster is capable of handling this workload, however, it doesn't
> seem scalable to write 1000+ dags to fit models for each client. I want
> each client to have its own task instance so it can be retried if it
> fails without having to run all 1000+ tasks over again. How do I handle
> this type of workflow in Airflow?


Submitting 1000+ tasks to airflow programatically

2018-03-21 Thread Kyle Hamlin
Hello,

I'm currently using Airflow for some ETL tasks where I submit a spark job
to a cluster and poll till it is complete. This workflow is nice because it
is typically a single Dag. I'm now starting to do more machine learning
tasks and need to build a model per client which is 1000+ clients. My
spark cluster is capable of handling this workload, however, it doesn't
seem scalable to write 1000+ dags to fit models for each client. I want
each client to have its own task instance so it can be retried if it
fails without having to run all 1000+ tasks over again. How do I handle
this type of workflow in Airflow?


Re: [VOTE] [RESULT] Migrate to Github as primary repo (a.k.a. Gitbox)

2018-03-21 Thread Chris Riccomini
Great, thanks for doing this Ash!

On Wed, Mar 21, 2018 at 8:14 AM, Ash Berlin-Taylor <
ash_airflowl...@firemirror.com> wrote:

> The vote passed with the following +1's, no -1.
>
> 4+ Binding (PPMC) votes
> Ash Berlin-Taylor, Chris Riccomini, Joy Gao, Maxime Beauchemin
>
> 4+ Non-binding (community) votes
>
> George Leslie-Waksman, Shah Altaf, Matthew Housley, Beau Barker
>
> I have created https://issues.apache.org/jira/browse/AIRFLOW-2238 <
> https://issues.apache.org/jira/browse/AIRFLOW-2238> to say we need to
> update the PR tool. I'm not sure how much time I will have to do this work
> over the next few days/weeks, but if no one else picks it up I'll try and
> get around to it. I think the plan would be: write this feature, get it
> reviewed, but don't merge it, ask ASF Infra team to migrate, than merge in
> this change.
>
> -ash
>
>
> > On 14 Mar 2018, at 21:49, George Leslie-Waksman  .INVALID> wrote:
> >
> > +1 (non-binding)
> >
> > On Mon, Mar 12, 2018 at 11:51 AM Shah Altaf  wrote:
> >
> >> +1 (non binding)
> >>
> >> @Beau Barker - correct me if I'm wrong - I believe that issues will
> still
> >> remain on Jira.  Github will be used just for source control and pull
> >> requests.  In the Kylin example, the commit messages contain the Jira
> >> ticket IDs.  See:  https://github.com/apache/kylin/commits/master
> >>
> >>
> >>
> >>
> >> On Mon, Mar 12, 2018 at 6:25 PM Jakob Homan  wrote:
> >>
>  +1 (binding)
> 
>  For future reference, is this vote for anyone on the mailing list, or
> >> for
>  those with some kind of status in the project?
> >>>
> >>> Matthew - yeah, binding votes are reserved for committers or PMC
> >>> members (depending on the vote).  Everyone in the community is
> >>> encouraged to vote, and those with binding votes are expected to pay
> >>> attention to those votes that aren't binding (ie, don't vote something
> >>> through that the larger community is angry about or has noticed
> >>> significant problems with), but in the end, it's the binding votes
> >>> that actually count.  Researching a question and voting is a form of
> >>> contribution to the project, so it's never wasted.  Brett has a good
> >>> slide on the general way this works:
> >>> https://www.slideshare.net/Hadoop_Summit/the-apache-way-80377908
> >>>
> >>> -Jakob
> >>>
> >>> On 12 March 2018 at 10:23, Chris Riccomini 
> >> wrote:
>  +1
> 
>  On Sat, Mar 10, 2018 at 11:18 PM, Maxime Beauchemin <
>  maximebeauche...@gmail.com> wrote:
> 
> > +1 (binding)
> >
> > On Sat, Mar 10, 2018 at 2:03 PM, Beau Barker <
> >> beauinmelbou...@gmail.com
> 
> > wrote:
> >
> >> +1 for Github.
> >>
> >> Also think that moving to Github issues would be a step in the right
> >> direction.
> >>
> >>
> >>> On 11 Mar 2018, at 05:56, Matthew Housley <
> >>> matthew.hous...@gmail.com>
> >> wrote:
> >>>
> >>> +1 (binding)
> >>>
> >>> For future reference, is this vote for anyone on the mailing list,
> >>> or
> > for
> >>> those with some kind of status in the project? I find the
> >>> documentation
> >>> here a little ambiguous:
> >>> https://httpd.apache.org/dev/guidelines.html#voting
> >>> Apologies if this has been answered before.
> >>>
> >>> On Sat, Mar 10, 2018 at 8:29 AM Ash Berlin-Taylor <
> >>> ash_airflowl...@firemirror.com> wrote:
> >>>
>  Consider this my +1 (binding) vote for the below proposal. This
> >>> vote
> >> will
>  run for 7 days (until 2018-03-17 15:30+00)
> 
>  **Proposal**: We switch to using GitHub as our primary repo
> 
>  We would still use the Apache Jira for issue/release tracking
> >> etc.
> 
>  Benefits:
> 
>  The contributors will gain write access to
>  github.com/apache/incubator-airflow. This would mean we would be
> >>> able
> >> to:
> 
>  - merge directly on github.com
>  - close stale issues
>  - be able to re-run Travis jobs (I think/hope)
> 
>  Risks:
> 
>  Neither of these are likely to be a problem, but the possible
> > downsides
>  are:
> 
>  - It is still possible to commit to the ASF repo, which if it
> >>> happens
> >> can
>  lead to "split brain" (i.e. different views of master) which will
> >>> need
>  INFRA team support to fix.
> 
>  - Contributors will need to agree to Github terms of service.
> >> Given
> > this
>  is how PRs are reviewed currently this isn't a problem for any
> >>> current
>  contributors. Just worth mentioning.
> 
> 
>  If the vote passes we will need to:
> 
>  - Update the airflow-pr tool to work directly on github, not ASF
> >>> repos
>  - Update any docs 

[VOTE] [RESULT] Migrate to Github as primary repo (a.k.a. Gitbox)

2018-03-21 Thread Ash Berlin-Taylor
The vote passed with the following +1's, no -1.

4+ Binding (PPMC) votes
Ash Berlin-Taylor, Chris Riccomini, Joy Gao, Maxime Beauchemin

4+ Non-binding (community) votes

George Leslie-Waksman, Shah Altaf, Matthew Housley, Beau Barker

I have created https://issues.apache.org/jira/browse/AIRFLOW-2238 
 to say we need to update 
the PR tool. I'm not sure how much time I will have to do this work over the 
next few days/weeks, but if no one else picks it up I'll try and get around to 
it. I think the plan would be: write this feature, get it reviewed, but don't 
merge it, ask ASF Infra team to migrate, than merge in this change.

-ash


> On 14 Mar 2018, at 21:49, George Leslie-Waksman 
>  wrote:
> 
> +1 (non-binding)
> 
> On Mon, Mar 12, 2018 at 11:51 AM Shah Altaf  wrote:
> 
>> +1 (non binding)
>> 
>> @Beau Barker - correct me if I'm wrong - I believe that issues will still
>> remain on Jira.  Github will be used just for source control and pull
>> requests.  In the Kylin example, the commit messages contain the Jira
>> ticket IDs.  See:  https://github.com/apache/kylin/commits/master
>> 
>> 
>> 
>> 
>> On Mon, Mar 12, 2018 at 6:25 PM Jakob Homan  wrote:
>> 
 +1 (binding)
 
 For future reference, is this vote for anyone on the mailing list, or
>> for
 those with some kind of status in the project?
>>> 
>>> Matthew - yeah, binding votes are reserved for committers or PMC
>>> members (depending on the vote).  Everyone in the community is
>>> encouraged to vote, and those with binding votes are expected to pay
>>> attention to those votes that aren't binding (ie, don't vote something
>>> through that the larger community is angry about or has noticed
>>> significant problems with), but in the end, it's the binding votes
>>> that actually count.  Researching a question and voting is a form of
>>> contribution to the project, so it's never wasted.  Brett has a good
>>> slide on the general way this works:
>>> https://www.slideshare.net/Hadoop_Summit/the-apache-way-80377908
>>> 
>>> -Jakob
>>> 
>>> On 12 March 2018 at 10:23, Chris Riccomini 
>> wrote:
 +1
 
 On Sat, Mar 10, 2018 at 11:18 PM, Maxime Beauchemin <
 maximebeauche...@gmail.com> wrote:
 
> +1 (binding)
> 
> On Sat, Mar 10, 2018 at 2:03 PM, Beau Barker <
>> beauinmelbou...@gmail.com
 
> wrote:
> 
>> +1 for Github.
>> 
>> Also think that moving to Github issues would be a step in the right
>> direction.
>> 
>> 
>>> On 11 Mar 2018, at 05:56, Matthew Housley <
>>> matthew.hous...@gmail.com>
>> wrote:
>>> 
>>> +1 (binding)
>>> 
>>> For future reference, is this vote for anyone on the mailing list,
>>> or
> for
>>> those with some kind of status in the project? I find the
>>> documentation
>>> here a little ambiguous:
>>> https://httpd.apache.org/dev/guidelines.html#voting
>>> Apologies if this has been answered before.
>>> 
>>> On Sat, Mar 10, 2018 at 8:29 AM Ash Berlin-Taylor <
>>> ash_airflowl...@firemirror.com> wrote:
>>> 
 Consider this my +1 (binding) vote for the below proposal. This
>>> vote
>> will
 run for 7 days (until 2018-03-17 15:30+00)
 
 **Proposal**: We switch to using GitHub as our primary repo
 
 We would still use the Apache Jira for issue/release tracking
>> etc.
 
 Benefits:
 
 The contributors will gain write access to
 github.com/apache/incubator-airflow. This would mean we would be
>>> able
>> to:
 
 - merge directly on github.com
 - close stale issues
 - be able to re-run Travis jobs (I think/hope)
 
 Risks:
 
 Neither of these are likely to be a problem, but the possible
> downsides
 are:
 
 - It is still possible to commit to the ASF repo, which if it
>>> happens
>> can
 lead to "split brain" (i.e. different views of master) which will
>>> need
 INFRA team support to fix.
 
 - Contributors will need to agree to Github terms of service.
>> Given
> this
 is how PRs are reviewed currently this isn't a problem for any
>>> current
 contributors. Just worth mentioning.
 
 
 If the vote passes we will need to:
 
 - Update the airflow-pr tool to work directly on github, not ASF
>>> repos
 - Update any docs that point to ASF repos (
 http://incubator.apache.org/projects/airflow.html,
 https://cwiki.apache.org/confluence/display/AIRFLOW/
> Committers%27+Guide
>> -
 there might be more)
 - Ensure all committers have access. There is an self-serve
>> process
> for
 this (see below)
 - Open a ticket with the