Re: Disable Processing of DAG file
It is an interesting question. On a slightly related note, Correct me if I'm wrong, AFAIK we require restarting airflow scheduler in order pick any new DAG file changes by the scheduler. In that case, should the scheduler do the DAGFileProcessing every time before scheduling the tasks? Regards, Ananth.P, On 28 May 2018 at 21:46, ramandu...@gmail.com wrote: > Hi All, > > We have a use case where there would be 100(s) of DAG files with schedule > set to "@once". Currently it seems that scheduler processes each and every > file and creates a Dag Object. > Is there a way or config to tell scheduler to stop processing certain > files. > > Thanks, > Raman Gupta >
Disable Processing of DAG file
Hi All, We have a use case where there would be 100(s) of DAG files with schedule set to "@once". Currently it seems that scheduler processes each and every file and creates a Dag Object. Is there a way or config to tell scheduler to stop processing certain files. Thanks, Raman Gupta
Re: HttpSensor raising exception with status=403
Thanks! I ended up creating a plugin and it's working OK. On Mon, May 28, 2018 at 9:22 AM Driesprong, Fokko wrote: > Hi Pedro, > > You could just create a CustomHttpHook and place it on your pythonpath, > then you should also create a CustomHttpSensor. Hope this helps. > > Cheers, Fokko > > 2018-05-26 2:48 GMT+02:00 Pedro Machado : > > > Hi, > > > > I am using HttpSensor to look for a file. The webserver is returning 403 > > (instead of 404) while the file is not available. This is causing the > > sensor to raise an exception. > > > > I see that a recent commit added the ability to disable the call to > > response.raise_for_status() on the http hook by passing > > extra_options={'check_response': False} to the sensor. > > > > https://github.com/apache/incubator-airflow/commit/ > > 6c19468e0b3b938249acc43e4b833a753d093efc?diff=unified > > > > I am unable to upgrade airflow. What would be the best way to incorporate > > the new code, perhaps into a custom sensor? > > > > Thanks, > > > > Pedro > > >
Re: How to wait for external process
Thanks Christopher for the idea. That would work, we already have such a "listener" that polls a queue (SQS) and creates the DAG runs. However it would have been nice to have the full process in one DAG to have a better overview about running jobs and leverage the gantt chart, but I think this can be accomplished via custom plugins and views. On 05/28/2018 08:43 PM, Christopher Bockman wrote: > Haven't done this, but we'll have a similar need in the future, so have > investigated a little. > > What about a design pattern something like this: > > 1) When jobs are done (ready for further processing) they publish those > details to a queue (such as GC Pub/Sub or any other sort of queue) > > 2) A single "listener" DAG sits and periodically checks that queue. If it > finds anything on it, it triggers (via DAG trigger) all of the DAGs which > are on the queue.* > > * = if your triggering volume is too high, this may cause airflow issues w/ > too many going at once; this could presumably be solved then via custom > rate-limiting on firing these > > 3) The listener DAG resets itself (triggers itself) > > > On Mon, May 28, 2018 at 7:17 AM, Driesprong, Fokko > wrote: > >> Hi Stefan, >> >> Afaik there isn't a more efficient way of doing this. DAGs that are relying >> on a lot of sensors are experiencing the same issues. The only way right >> now, I can think of, is doing updating the state directly in the database. >> But then you need to know what you are doing. I can image that this would >> be feasible by using an AWS lambda function. Hope this helps. >> >> Cheers, Fokko >> >> 2018-05-26 17:50 GMT+02:00 Stefan Seelmann : >> >>> Hello, >>> >>> I have a DAG (externally triggered) where some processing is done at an >>> external system (EC2 instance). The processing is started by an Airflow >>> task (via HTTP request). The DAG should only continue once that >>> processing is completed. In a first naive implementation I created a >>> sensor that gets the progress (via HTTP request) and only if status is >>> "finished" returns true and the DAG run continues. That works but... >>> >>> ... the external processing can take hours or days, and during that time >>> a worker is occupied which does nothing but HTTP GET and sleep. There >>> will be hundreds of DAG runs in parallel which means hundreds of workers >>> are occupied. >>> >>> I looked into other operators that do computation on external systems >>> (ECSOperator, AWSBatchOperator) but they also follow that pattern and >>> just wait/sleep. >>> >>> So I want to ask if there is a more efficient way to build such a >>> workflow with Airflow? >>> >>> Kind Regards, >>> Stefan >>> >> >
Re: Alert Emails Templatizing
It is a bit tricky; *Step 1:* you can write an SLA miss callback, send email from the callback and empty the `slas` object so that airflow won't sent SLA miss email. https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L688 *Step 2:* You can reuse airflow `send_email` method here https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L61 *Step 3:* If you are sending sla_miss from your callback, you need to mutate the `sla_miss` table just like https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L726 I hope this will get simplified in the future releases. Regards, Ananth.P, On 28 May 2018 at 05:48, vardangupta...@gmail.com wrote: > Hi team, > > We had a use case where we wanted to serve different email body to > different use cases at the time of failure & up_for_retry, currently body > seems to be hard coded in models.py, Is there any plan to make it > templatized in upcoming future or it will be a good idea if we come across > with code change and contribute? Please suggest recommended way of > implementing the feature. > > > Regards, > Vardan Gupta >
Re: How to wait for external process
Since you already on AWS, the simplest thing I could think of is to write a signal file once the job finished and the downstream job waiting for the signal file. In other words, the same pattern how the Hadoop jobs writing `_SUCCESS` file and the downstream jobs depends on the signal file. Regards, Ananth.P, On 28 May 2018 at 13:06, Stefan Seelmann wrote: > Thanks Christopher for the idea. That would work, we already have such a > "listener" that polls a queue (SQS) and creates the DAG runs. However it > would have been nice to have the full process in one DAG to have a > better overview about running jobs and leverage the gantt chart, but I > think this can be accomplished via custom plugins and views. > > On 05/28/2018 08:43 PM, Christopher Bockman wrote: > > Haven't done this, but we'll have a similar need in the future, so have > > investigated a little. > > > > What about a design pattern something like this: > > > > 1) When jobs are done (ready for further processing) they publish those > > details to a queue (such as GC Pub/Sub or any other sort of queue) > > > > 2) A single "listener" DAG sits and periodically checks that queue. If > it > > finds anything on it, it triggers (via DAG trigger) all of the DAGs which > > are on the queue.* > > > > * = if your triggering volume is too high, this may cause airflow issues > w/ > > too many going at once; this could presumably be solved then via custom > > rate-limiting on firing these > > > > 3) The listener DAG resets itself (triggers itself) > > > > > > On Mon, May 28, 2018 at 7:17 AM, Driesprong, Fokko > > > wrote: > > > >> Hi Stefan, > >> > >> Afaik there isn't a more efficient way of doing this. DAGs that are > relying > >> on a lot of sensors are experiencing the same issues. The only way right > >> now, I can think of, is doing updating the state directly in the > database. > >> But then you need to know what you are doing. I can image that this > would > >> be feasible by using an AWS lambda function. Hope this helps. > >> > >> Cheers, Fokko > >> > >> 2018-05-26 17:50 GMT+02:00 Stefan Seelmann : > >> > >>> Hello, > >>> > >>> I have a DAG (externally triggered) where some processing is done at an > >>> external system (EC2 instance). The processing is started by an Airflow > >>> task (via HTTP request). The DAG should only continue once that > >>> processing is completed. In a first naive implementation I created a > >>> sensor that gets the progress (via HTTP request) and only if status is > >>> "finished" returns true and the DAG run continues. That works but... > >>> > >>> ... the external processing can take hours or days, and during that > time > >>> a worker is occupied which does nothing but HTTP GET and sleep. There > >>> will be hundreds of DAG runs in parallel which means hundreds of > workers > >>> are occupied. > >>> > >>> I looked into other operators that do computation on external systems > >>> (ECSOperator, AWSBatchOperator) but they also follow that pattern and > >>> just wait/sleep. > >>> > >>> So I want to ask if there is a more efficient way to build such a > >>> workflow with Airflow? > >>> > >>> Kind Regards, > >>> Stefan > >>> > >> > > > >
Ability to discover custom plugins, operators, sensors, etc. from various locations
Hi, I was trying to find if its possible to create custom plugins without dropping them into `$AIRFLOW_HOME/plugins` (or the directory defined in `airflow.cfg`). We can define one location in `airflow.cfg` but I have multiple projects which will have their own workflows so, ideally I would want to implement custom plugins (along with operators, sensors, etc.) in those repositories itself and let them get discovered from there. Checking through documentation and browsing code I concluded that the only way to get this done is by ensuring that `$PYTHONPATH` is set to include directories where these custom modules are stored. So, I want to ask if there is a better way to handle this? If not, can `plugins_manager` (AirflowPlugin) be improved to suit this use-case? Kind Regards, Ritesh
Re: How to wait for external process
Haven't done this, but we'll have a similar need in the future, so have investigated a little. What about a design pattern something like this: 1) When jobs are done (ready for further processing) they publish those details to a queue (such as GC Pub/Sub or any other sort of queue) 2) A single "listener" DAG sits and periodically checks that queue. If it finds anything on it, it triggers (via DAG trigger) all of the DAGs which are on the queue.* * = if your triggering volume is too high, this may cause airflow issues w/ too many going at once; this could presumably be solved then via custom rate-limiting on firing these 3) The listener DAG resets itself (triggers itself) On Mon, May 28, 2018 at 7:17 AM, Driesprong, Fokko wrote: > Hi Stefan, > > Afaik there isn't a more efficient way of doing this. DAGs that are relying > on a lot of sensors are experiencing the same issues. The only way right > now, I can think of, is doing updating the state directly in the database. > But then you need to know what you are doing. I can image that this would > be feasible by using an AWS lambda function. Hope this helps. > > Cheers, Fokko > > 2018-05-26 17:50 GMT+02:00 Stefan Seelmann : > > > Hello, > > > > I have a DAG (externally triggered) where some processing is done at an > > external system (EC2 instance). The processing is started by an Airflow > > task (via HTTP request). The DAG should only continue once that > > processing is completed. In a first naive implementation I created a > > sensor that gets the progress (via HTTP request) and only if status is > > "finished" returns true and the DAG run continues. That works but... > > > > ... the external processing can take hours or days, and during that time > > a worker is occupied which does nothing but HTTP GET and sleep. There > > will be hundreds of DAG runs in parallel which means hundreds of workers > > are occupied. > > > > I looked into other operators that do computation on external systems > > (ECSOperator, AWSBatchOperator) but they also follow that pattern and > > just wait/sleep. > > > > So I want to ask if there is a more efficient way to build such a > > workflow with Airflow? > > > > Kind Regards, > > Stefan > > >
Alert Emails Templatizing
Hi team, We had a use case where we wanted to serve different email body to different use cases at the time of failure & up_for_retry, currently body seems to be hard coded in models.py, Is there any plan to make it templatized in upcoming future or it will be a good idea if we come across with code change and contribute? Please suggest recommended way of implementing the feature. Regards, Vardan Gupta
Re: HttpSensor raising exception with status=403
Hi Pedro, You could just create a CustomHttpHook and place it on your pythonpath, then you should also create a CustomHttpSensor. Hope this helps. Cheers, Fokko 2018-05-26 2:48 GMT+02:00 Pedro Machado: > Hi, > > I am using HttpSensor to look for a file. The webserver is returning 403 > (instead of 404) while the file is not available. This is causing the > sensor to raise an exception. > > I see that a recent commit added the ability to disable the call to > response.raise_for_status() on the http hook by passing > extra_options={'check_response': False} to the sensor. > > https://github.com/apache/incubator-airflow/commit/ > 6c19468e0b3b938249acc43e4b833a753d093efc?diff=unified > > I am unable to upgrade airflow. What would be the best way to incorporate > the new code, perhaps into a custom sensor? > > Thanks, > > Pedro >
Re: How to wait for external process
Hi Stefan, Afaik there isn't a more efficient way of doing this. DAGs that are relying on a lot of sensors are experiencing the same issues. The only way right now, I can think of, is doing updating the state directly in the database. But then you need to know what you are doing. I can image that this would be feasible by using an AWS lambda function. Hope this helps. Cheers, Fokko 2018-05-26 17:50 GMT+02:00 Stefan Seelmann: > Hello, > > I have a DAG (externally triggered) where some processing is done at an > external system (EC2 instance). The processing is started by an Airflow > task (via HTTP request). The DAG should only continue once that > processing is completed. In a first naive implementation I created a > sensor that gets the progress (via HTTP request) and only if status is > "finished" returns true and the DAG run continues. That works but... > > ... the external processing can take hours or days, and during that time > a worker is occupied which does nothing but HTTP GET and sleep. There > will be hundreds of DAG runs in parallel which means hundreds of workers > are occupied. > > I looked into other operators that do computation on external systems > (ECSOperator, AWSBatchOperator) but they also follow that pattern and > just wait/sleep. > > So I want to ask if there is a more efficient way to build such a > workflow with Airflow? > > Kind Regards, > Stefan >
Re: Using Airflow with dataset dependant flows (not date)
This seemed like a very clear explanation of the JIRA ticket and the idea of making dagruns depend not on a schedule but the arrival of a dataset. I think a lot would have to change if the execution date was changed to a parameterized value, and that's not the only thing that would have to change to support a dataset trigger. Thinking about the video encoding example, it seem the airflow way to kind of do that would be to have dataset dags be dependent on a dag that is frequently scheduled to run just a TriggerDagOperator which contains a python callable polling for the new datasets (or subscribing to a queue of updates about them) which then decides which DAG ID to trigger for the particular dataset, and what dag_run_obj.payload should be to inform it of the right dataset to run on. You might want to write a plugin that give a different kind of tree view for these types of DAGs that get triggered this way so that you can easily see the dataset and payload specifics in the overview of the runs. There's an example of triggering a dag with an assigned payload: https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_trigger_controller_dag.py And an example of the triggered dag using the payload: https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_trigger_target_dag.py The latter part works the same way as when a cli triggered dag accepts a conf object. The experimental API also contains a way of triggering with a conf object: https://github.com/apache/incubator-airflow/blob/master/airflow/www/api/experimental/endpoints.py#L42 So if you wanted to skip the high-frequency trigger controller dag, and used a kind of queue, like an SQS queue to which you could subscribe a https trigger or something, then the queue system could trigger a target dag through the API. Does this help you in more concretely using Airflow for your needs or are you looking to fill in a feature for some part of the roadmap that doesn't yet exist? -Daniel On 5/18/18, 4:52 PM, "Javier Domingo Cansino"wrote: Hello Guys, First of all, I have submitted the idea to JIRA[1], and after speaking with the guys at gitter, they told me to bring the discussion here too. Right now Airflow only understands of being a date based scheduler. It is extremely complete on that sense, and makes it really easy to populate and backfill your DAGs. Monitoring is quite decent, and can be improved through plugins. Everything is code, as opposed to most of the alternatives out there[2][3][4], and you may or not depend on files existing to go to the next step. There is an UI that lets you visualize the status of your systems and trigger manually jobs. There is a limitation however on running on dates only, and is that sometimes there are DAGs that will not depend on the date, but on the dataset. Some examples I am close to: * Bioinf pipeline, where you process samples * Media pipeline, where you may process different videos/audios in the same way Right now I am using Snakemake for the first ones, and bash scripts for the second one, however I have thought that maybe Airflow could be a solution to these two problems. I have been reading the code, and although the term execution_date is quite coupled, it seems like it could be doable to abstract the datatype of this parametrization variable (datetime) and extend it to be something that could depend on something else (string). After all, for what I have seen execution_date is just the parametrization variable. Questions I would like to ask: * Is this some need you have had? If so, how did you solve it? Is there any other tool with the features I described that could help me on that? * How do you recommend solving this with Airflow? * In gitter people has proposed forgetting about execution_dates, just triggering the DAGs and parametrizing the run through variables. However this has the drawback to lose execution tracking, and make impossible to run several DAGs at the same time for different datasets * There was also the proposal to instantiate subDAGs per dataset, and have one DAG where the first step is to read what are the samples to run on. The problem I see with this is that you lose tracking on which samples have been run, and you cannot have per sample historic data. * Airflow works good when you have datasets that change, therefore, other solution would be to instantiate one DAG per sample, and then have a single execution. However this sounds a bit overkill to me, because you would just have one DAGRun per DAG. * If this is something that would