Re: Can a DAG be conditionally hidden from the UI?

2018-10-08 Thread Chris Palmer
I like James solution better, but the initial thought I had was to deploy
airflowignore files to the environments to filter out files that should not
be processed when filling the DagBag.

Chris

On Mon, Oct 8, 2018 at 10:22 AM James Meickle
 wrote:

> As long as the Airflow process can't find the DAG as a top-level object in
> the module, it won't be registered. For example, we have a function that
> returns DAGs; the function returns nothing if it's not in the right
> environment.
>
> On Sun, Oct 7, 2018 at 2:31 PM Shah Altaf  wrote:
>
> > Hi all,
> >
> > tl;dr - Is it possible to conditionally hide a DAG from the UI based on
> an
> > environment variable?
> >
> >
> >
> > Our team has a single repo with several DAGs in it and we deploy it
> across
> > multiple 'environments' (think dev, test, and other integration
> > environments).  While most DAGs are meant to run everywhere, we do have
> > several which are meant to run in one and only one environment.  Of
> course
> > they are all paused, but it would be nice to declutter a bit for
> ourselves.
> >
> > My question then - is it possible to conditionally hide a DAG from the UI
> > based on an environment variable or some flag somewhere.
> >
> > This is just wishful thinking - the dev could do something like
> >
> > dag = get_dag(...),and get_dag() would have a decorator like
> > @only_run_in("integration4,dev,local")
> >
> > And that decorator returns some kind of None object or special DAG which
> > just doesn't appear in the list.
> >
> > Or perhaps some other way to accomplish this same effect - any ideas
> would
> > be appreciated.
> >
> > Thanks
> >
>


Re: Solved: suppress PendingDeprecationWarning messages in airflow logs

2018-09-28 Thread Chris Palmer
Doesn't this warning come from the BaseOperator class -
https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L2511
?

Are you passing unused arguments to the QuboleOperator, or do you not
control the instantiation of those operators?

Chris

On Fri, Sep 28, 2018 at 7:39 PM Sean Carey 
wrote:

> Haha yes, good point.  Thanks!
>
> Sean
>
>
> On 9/28/18, 6:13 PM, "Ash Berlin-Taylor"  wrote:
>
> Sounds good for your use, certainly.
>
> I mainly wanted to make sure other people knew before blindly
> equipping a foot-canon :)
>
> -ash
>
> > On 29 Sep 2018, at 00:09, Sean Carey 
> wrote:
> >
> > Thanks, Ash.  I understand what you're saying.  The warnings are
> coming from the Qubole operator.  We get a lot of this:
> >
> > PendingDeprecationWarning: Invalid arguments were passed to
> QuboleOperator. Support for passing such arguments will be dropped in
> Airflow 2.0. Invalid arguments were:
> > *args: ()
> > **kwargs: {...}
> >  category=PendingDeprecationWarning
> >
> > We've spoken to Qubole about this and they plan to address it.  In
> the meantime, it generates a ton of noise in our logs which we'd like to
> suppress -- we are aware of the issue, and don't need to be told about it
> every 5 seconds.
> >
> > As for it suddenly breaking, being that this is pending Airflow 2.0
> I feel the risk is low and when we do upgrade it will be thoroughly tested.
> >
> > Thanks!
> >
> > Sean
> >
> >
> > On 9/28/18, 5:01 PM, "Ash Berlin-Taylor"  wrote:
> >
> >What deprecation warnings are you getting? Are they from Airflow
> itself (i.e. things Airflow is calling like flask_wtf, etc) or of your use
> of Airflow?
> >
> >If it is the form could you check and see if someone has already
> reported a Jira issue so we can fix them?
> https://issues.apache.org/jira/issues/?jql=project%3DAIRFLOW
> >
> >If it is the latter PLEASE DO NOT IGNORE THESE.
> >
> >Deprecation warnings are how we, the Airflow community tell users
> that you need to make a change to your DAG/code/config to upgrade things.
> If you silence these warnings you will have a much harder time upgrading to
> new versions of Airflow (read: you might suddenly find that things stop
> working because you turned of the warnings.)
> >
> >-ash
> >
> >> On 28 Sep 2018, at 22:52, Sean Carey 
> wrote:
> >>
> >> Hello,
> >>
> >> I’ve been looking for a way to suppress the
> PendingDeprecationWarning messages cluttering our airflow logs and I have a
> working solution which I thought I would share.
> >>
> >> In order to do this, you first need to configure airflow for custom
> logging using steps 1-4 here:
> >>
> >>
> https://airflow.readthedocs.io/en/stable/howto/write-logs.html#writing-logs-to-azure-blob-storage
> >>
> >> (note that although the document is for Azure remote logging you
> don’t actually need azure for this)
> >>
> >> Next, modify the log_config.py script created in the step above as
> follows:
> >>
> >>
> >> 1.  Import logging
> >> 2.  Define the filter class:
> >>
> >>
> >>
> >> class DeprecationWarningFilter(logging.Filter):
> >>
> >>   def filter(self, record):
> >>
> >>   allow = 'DeprecationWarning' not in record.msg
> >>
> >>   return allow
> >>
> >>
> >> 1.  Add a “filters” section to the LOGGING_CONFIG beneath
> “formatters:
> >>
> >>
> >>
> >> 'filters': {
> >>
> >>   'noDepWarn': {
> >>
> >>   '()': DeprecationWarningFilter,
> >>
> >>   }
> >>
> >>   },
> >>
> >>
> >> 1.  For each of the handlers where you want to suppress the
> warnings (console, task, processor, or any of the remote log handlers you
> may be using) add the following line to its configuration:
> >>
> >>
> >>
> >> 'filters': ['noDepWarn'],
> >>
> >> Restart airflow and your logs should be clean.
> >>
> >>
> >> Sean Carey
> >>
> >
> >
> >
>
>
>
>


Re: execution_date - can we stop the confusion?

2018-09-27 Thread Chris Palmer
While taking a step back makes some sense, we also need to identify what
the issue is. Simply saying 'execution_date behavior is confusing to new
users' isn't good enough. What is confusing about it? Is it what it
represents, or just the name itself?

There are a number of different timestamps that might be of interest,
including (but not limited to):

*Identifying timestamp*
For any time interval, there are two natural choices of timestamps to
represent that interval, the left and right bounds. For Airflow the left
bound has been chosen, and is called execution_date. For various reasons, I
think that makes a much better choice than the right bound.

*Create/update/delete timestamps*
Timestamps representing when particular database records where created,
updated and or deleted. I don't believe that Airflow currently records
these.

*Runtime timestamps*
The timestamps that a task or other process started and stopped. Airflow
records these for Tasks, but I think the implementation is maybe a little
lacking for DagRuns.


So what's the confusion with execution_date? Is it what it represents or
the name itself?

I think part of the learning curve with Airflow is understanding that
execution_date is the left bound of the interval. No matter what name you
use for the identifying timestamp I think new users will need to learn what
that choice means. Changing the name won't magically make all the confusion
go away.

While I don't think execution_date is the greatest name in the world, it's
a lot better than the suggested alternative run_stamped. Tasks also have an
identifying timestamp, and if I saw run_stamped on a Task I would have no
idea what it means (stamped by what?).

While there may be better names than execution_date, I don't think they are
so much better that it is worth the effort to overhaul such an integral
part of Airflow. Maybe some improvements to the documentation could be
made, but nothing so drastic as to renaming such a core item.


As for the second suggestion to add "a new variable which indicated the
actual datetime when the DAG run was generated. call it
execution_start_date". It is very unclear what the desired outcome is with
this.

To me "generated" implies creation time, i.e. recorded in the database.
However, creation of a DagRun record in the database is a distinct event
from when Tasks associated with that DagRun start executing. Plus DagRuns
themselves don't actually "run" - Tasks are the only thing that really gets
run by Airflow.

What is actually desired here?
 - The right bound of the schedule interval?
 - The time the DagRun was created?
 - The time that any Tasks associated with a DagRun were first considered
by the scheduler?
 - The time that any Tasks associated with a DagRun were first scheduled?
 - The time that any Tasks associated with a DagRun were actually started
by a worker?


The lack of clarity and completeness around these suggestions, alongside
inane declarations like "This name won't cause people to get confused" is
hardly a good way to get people to take suggestions seriously.

Chris


On Wed, Sep 26, 2018 at 7:37 PM George Leslie-Waksman 
wrote:

> This comes up a lot. I've seen it on this mailing list multiple times and
> it's something that I have to explicitly call out to every single person
> that I've helped train up on Airflow.
>
> If we take a moment to set aside why things are the way they are, what the
> documentation says, and how experienced users feel things should behave;
> there still remains the fact that a lot of new users get confused by how
> "execution_date" works.
>
> Whether it's a problem, whether we need to do something, and what we could
> do are all separate questions but I think it's important that we
> acknowledge and start from:
>
> A lot of new users get confused by how "execution_date" works.
>
> I recognize that some of this is a learning curve issue and some of this is
> a mindset issue but it begs the question: do enough users benefit from the
> current structure to justify the harm to new users?
>
> --George
>
> On Wed, Sep 26, 2018 at 1:40 PM Brian Greene <
> br...@heisenbergwoodworking.com> wrote:
>
> > It took a minute to grok, but in the larger context of how af works it
> > makes perfect sense the way it is.  Changing something so fundamentally
> > breaking to every dag in existence should bring a comparable benefit.
> > Beyond the avoiding teaching a concept you disagree with, what benefits
> > does the proposal bring to offset the cost of change?
> >
> > I’m gonna make a meme - “do you even airflow bro?”
> >
> > Sent from a device with less than stellar autocorrect
> >
> > > On Sep 26, 2018, at 8:33 AM, Maxime Beauchemin <
> > maximebeauche...@gmail.com> wrote:
> > >
> > > I think if you have a functional mindset (as in "functional data
> > engineering
> > > <
> >
> https://medium.com/@maximebeauchemin/functional-data-engineering-a-modern-paradigm-for-batch-data-processing-2327ec32c42a
> > >")
> > > as opposed to a cro

Re: Creating dynamic pool from task

2018-09-25 Thread Chris Palmer
David,

I was playing around with this over the weekend, and mostly found that it
doesn't seem to be possible. I was able to get an operator to template out
the pool attribute, when it renders it's templates. However this doesn't
normally get done until execution, and so the un-templated pool attribute
get's used when the scheduler sends the task to the executor.

Chris

On Fri, Sep 21, 2018 at 6:12 PM Chris Palmer  wrote:

> I see, so for a given DagRun you want to limit the compute tasks that are
> running. But I'm guessing you want multiple DagRuns to be able to run
> concurrently to operate on their own clusters independently.
>
> From what I could tell in the code, the pool gets checked before execution
> (which is when templates are rendered). Which makes dynamic pools difficult
> to do.
>
> It's probably possible to find a solution but I think it's likely going to
> involve some ugly code/inspection of the python stack.
>
> Chris
>
> On Sep 21, 2018 4:47 PM, "David Szakallas" 
> wrote:
>
> Chris, the tasks are independent of each other so they can run
> concurrently. I have to limit the concurrency though, so they don’t starve.
> As the cluster is created dynamically with a task, a shared pool with other
> DAGs or other runs of the same DAG is not preferable.
>
> I imagined something like this:
>
> .——>
>  [compute_1] ——.
>   / ——>
>  [compute_2] ——  \
> /
>  .\
> [create_cluster] —> [create_pool_x6] .
> [ delete_pool ] —> [delete cluster]
>\
>   ./
>  \ ——>
>  [compute_19] —— /
>. ——>
> [compute_20] ——.
> Thanks,
> David
>
>
> > On Sep 21, 2018, at 7:23 PM, Chris Palmer  wrote:
> >
> > What would cause multiple computation tasks to run on the cluster at the
> > same time? Are you worried about concurrent DagRuns? Does setting dag
> > concurrency and/or task concurrency appropriately solve your problem?
> >
> > Chris
> >
> > On Thu, Sep 20, 2018 at 8:31 PM David Szakallas <
> dszakal...@whitepages.com>
> > wrote:
> >
> >> What I am doing is very similar. However I am including the DagRun's id
> in
> >> the pool name to make it unique, as I need to make sure every run gets
> its
> >> own pool. I am getting that from the context object, which is only
> >> available within execute methods or templates. How do you make sure each
> >> run has it's own pool?
> >>
> >>
> >> Thanks,
> >>
> >> Dávid Szakállas
> >> Software Engineer | Whitepages Data Services
> >>
> >> 
> >> From: Taylor Edmiston 
> >> Sent: Thursday, September 20, 2018 6:17:05 PM
> >> To: dev@airflow.incubator.apache.org
> >> Subject: Re: Creating dynamic pool from task
> >>
> >> I've done something similar.  I have a task at the front of the DAG that
> >> ensures the connection pool exists and creates the pool if it doesn't.
> >> I've pasted my code below.  This runs in a for loop that creates one DAG
> >> per iteration each with its own pool.  Then I pass the pool name into
> the
> >> sensors.
> >>
> >> Does this work for your use case?
> >>
> >> --
> >>
> >> redshift_pool = PythonOperator(
> >>task_id='redshift_pool',
> >>dag=dag,
> >>python_callable=ensure_redshift_pool,
> >>op_kwargs={
> >>'name': workflow.pool,
> >>'slots': REDSHIFT_POOL_SLOTS,
> >>},
> >>...
> >> )
> >>
> >> @provide_session
> >> def ensure_redshift_pool(name, slots, session=None):
> >>pool = Pool(pool=name, slots=slots)
> >>pool_query = (
> >>session.query(Pool)
> >>.filter(Pool.pool == name)
> >>)
> >>pool_query_result = pool_query.one_or_none()
> >>if not pool_query_result:
> >>logger.info(f'redshift pool "{name}" does not exist - creating
> >> it')
> >>session

Re: Creating dynamic pool from task

2018-09-21 Thread Chris Palmer
I see, so for a given DagRun you want to limit the compute tasks that are
running. But I'm guessing you want multiple DagRuns to be able to run
concurrently to operate on their own clusters independently.

>From what I could tell in the code, the pool gets checked before execution
(which is when templates are rendered). Which makes dynamic pools difficult
to do.

It's probably possible to find a solution but I think it's likely going to
involve some ugly code/inspection of the python stack.

Chris

On Sep 21, 2018 4:47 PM, "David Szakallas" 
wrote:

Chris, the tasks are independent of each other so they can run
concurrently. I have to limit the concurrency though, so they don’t starve.
As the cluster is created dynamically with a task, a shared pool with other
DAGs or other runs of the same DAG is not preferable.

I imagined something like this:

.——>
 [compute_1] ——.
  / ——>
 [compute_2] ——  \
/
 .\
[create_cluster] —> [create_pool_x6] .
  [ delete_pool ] —> [delete cluster]
   \
./
 \ ——>
 [compute_19] —— /
   . ——>
[compute_20] ——.
Thanks,
David


> On Sep 21, 2018, at 7:23 PM, Chris Palmer  wrote:
>
> What would cause multiple computation tasks to run on the cluster at the
> same time? Are you worried about concurrent DagRuns? Does setting dag
> concurrency and/or task concurrency appropriately solve your problem?
>
> Chris
>
> On Thu, Sep 20, 2018 at 8:31 PM David Szakallas 
> wrote:
>
>> What I am doing is very similar. However I am including the DagRun's id
in
>> the pool name to make it unique, as I need to make sure every run gets
its
>> own pool. I am getting that from the context object, which is only
>> available within execute methods or templates. How do you make sure each
>> run has it's own pool?
>>
>>
>> Thanks,
>>
>> Dávid Szakállas
>> Software Engineer | Whitepages Data Services
>>
>> 
>> From: Taylor Edmiston 
>> Sent: Thursday, September 20, 2018 6:17:05 PM
>> To: dev@airflow.incubator.apache.org
>> Subject: Re: Creating dynamic pool from task
>>
>> I've done something similar.  I have a task at the front of the DAG that
>> ensures the connection pool exists and creates the pool if it doesn't.
>> I've pasted my code below.  This runs in a for loop that creates one DAG
>> per iteration each with its own pool.  Then I pass the pool name into the
>> sensors.
>>
>> Does this work for your use case?
>>
>> --
>>
>> redshift_pool = PythonOperator(
>>task_id='redshift_pool',
>>dag=dag,
>>python_callable=ensure_redshift_pool,
>>op_kwargs={
>>'name': workflow.pool,
>>'slots': REDSHIFT_POOL_SLOTS,
>>},
>>...
>> )
>>
>> @provide_session
>> def ensure_redshift_pool(name, slots, session=None):
>>pool = Pool(pool=name, slots=slots)
>>pool_query = (
>>session.query(Pool)
>>.filter(Pool.pool == name)
>>)
>>pool_query_result = pool_query.one_or_none()
>>if not pool_query_result:
>>logger.info(f'redshift pool "{name}" does not exist - creating
>> it')
>>session.add(pool)
>>session.commit()
>>logger.info(f'created redshift pool "{name}"')
>>else:
>>logger.info(f'redshift pool "{name}" already exists')
>>
>> --
>>
>> *Taylor Edmiston*
>> Blog <https://blog.tedmiston.com/> | LinkedIn
>> <https://www.linkedin.com/in/tedmiston/> | Stack Overflow
>> <https://stackoverflow.com/users/149428/taylor-edmiston> | Developer
Story
>> <https://stackoverflow.com/story/taylor>
>>
>>
>>
>> On Thu, Sep 20, 2018 at 10:08 AM David Szakallas <
>> dszakal...@whitepages.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I have a DAG that creates a cluster, starts computation tasks, and after
>>> they completed, tears down the cluster. I want to limit concurrency for
>> the
>>> computation tasks carried on this cluster to fixed 

Re: Creating dynamic pool from task

2018-09-21 Thread Chris Palmer
What would cause multiple computation tasks to run on the cluster at the
same time? Are you worried about concurrent DagRuns? Does setting dag
concurrency and/or task concurrency appropriately solve your problem?

Chris

On Thu, Sep 20, 2018 at 8:31 PM David Szakallas 
wrote:

> What I am doing is very similar. However I am including the DagRun's id in
> the pool name to make it unique, as I need to make sure every run gets its
> own pool. I am getting that from the context object, which is only
> available within execute methods or templates. How do you make sure each
> run has it's own pool?
>
>
> Thanks,
>
> Dávid Szakállas
> Software Engineer | Whitepages Data Services
>
> 
> From: Taylor Edmiston 
> Sent: Thursday, September 20, 2018 6:17:05 PM
> To: dev@airflow.incubator.apache.org
> Subject: Re: Creating dynamic pool from task
>
> I've done something similar.  I have a task at the front of the DAG that
> ensures the connection pool exists and creates the pool if it doesn't.
> I've pasted my code below.  This runs in a for loop that creates one DAG
> per iteration each with its own pool.  Then I pass the pool name into the
> sensors.
>
> Does this work for your use case?
>
> --
>
> redshift_pool = PythonOperator(
> task_id='redshift_pool',
> dag=dag,
> python_callable=ensure_redshift_pool,
> op_kwargs={
> 'name': workflow.pool,
> 'slots': REDSHIFT_POOL_SLOTS,
> },
> ...
> )
>
> @provide_session
> def ensure_redshift_pool(name, slots, session=None):
> pool = Pool(pool=name, slots=slots)
> pool_query = (
> session.query(Pool)
> .filter(Pool.pool == name)
> )
> pool_query_result = pool_query.one_or_none()
> if not pool_query_result:
> logger.info(f'redshift pool "{name}" does not exist - creating
> it')
> session.add(pool)
> session.commit()
> logger.info(f'created redshift pool "{name}"')
> else:
> logger.info(f'redshift pool "{name}" already exists')
>
> --
>
> *Taylor Edmiston*
> Blog  | LinkedIn
>  | Stack Overflow
>  | Developer Story
> 
>
>
>
> On Thu, Sep 20, 2018 at 10:08 AM David Szakallas <
> dszakal...@whitepages.com>
> wrote:
>
> > Hi all,
> >
> > I have a DAG that creates a cluster, starts computation tasks, and after
> > they completed, tears down the cluster. I want to limit concurrency for
> the
> > computation tasks carried on this cluster to fixed number. So logically,
> I
> > need a pool that is exclusive to the cluster created by a task. I don't
> > want interference with other DAGs or different runs of the same DAG.
> >
> > I thought I could solve this problem by creating a pool dynamically from
> a
> > task after the cluster is created and delete it once the computation
> tasks
> > are finished. I thought I could template the pool parameter of the
> > computation tasks to make them use this dynamically created cluster.
> >
> > However this way the computation tasks will never be triggered. So I
> think
> > the pool parameter is saved in the task instance before being templated.
> I
> > would like to hear your thoughts on how to achieve the desired behavior.
> >
> > Thanks,
> >
> > Dávid Szakállas
> > Software Engineer | Whitepages Data Services
> >
> >
> >
> >
> >
>


Re: [External] Re: Dynamic tasks in a dag?

2018-09-14 Thread Chris Palmer
The relative paths might work from where ever you are evoking 'airflow
list_tasks', but that doesn't mean they work from wherever the webserver is
parsing the dags from.

Does running 'airflow list_tasks' from some other running directory work?

On Fri, Sep 14, 2018 at 12:35 PM Frank Maritato
 wrote:

> Do you mean give the full path to the files? The relative path I'm using
> definitely works. When I type airflow list_dags, I can see the output from
> the print statements that the glob is finding my sql files and creating the
> snowflake operators.
>
> airflow list_tasks workflow also lists all the operators I'm creating. I'm
> just not seeing them in the ui.
>
> On 9/14/18, 9:10 AM, "Sai Phanindhra"  wrote:
>
> Hi frank,
> Can you try giving global paths?
>
> On Fri 14 Sep, 2018, 21:35 Frank Maritato,  .invalid>
> wrote:
>
> > Hi,
> >
> > I'm using apache airflow 1.10.0 and I'm trying to dynamically
> generate
> > some tasks in my dag based on files that are in the dags directory.
> The
> > problem is, I don't see these tasks in the ui, I just see the
> 'start' dummy
> > operator. If I type 'airflow list_tasks workflow', they are listed.
> > Thoughts?
> >
> > Here is how I'm generating the tasks:
> >
> >
> > def create_snowflake_operator(file, dag, snowflake_connection):
> > file_repl = file.replace('/', '_')
> > file_repl = file_repl.replace('.sql', '')
> > print("TASK_ID {}".format(file_repl))
> > return SnowflakeOperator(
> > dag=dag,
> > task_id='create_{}'.format(file_repl),
> > snowflake_conn_id=snowflake_connection,
> > sql=file
> > )
> >
> > DAG_NAME = 'create_objects'
> > dag = DAG(
> > DAG_NAME,
> > default_args=args,
> > dagrun_timeout=timedelta(hours=2),
> > schedule_interval=None,
> > )
> >
> > start = DummyOperator(
> > dag=dag,
> > task_id="start",
> > )
> >
> > print("creating snowflake operators")
> >
> > for file in glob('dags/snowsql/create/udf/*.sql'):
> > print("FILE {}".format(file))
> > task = create_snowflake_operator(file, dag, 'snowflake_default')
> > task.set_upstream(start)
> >
> > for file in glob('dags/snowsql/create/table/*.sql'):
> > print("FILE {}".format(file))
> > task = create_snowflake_operator(file, dag, 'snowflake_default')
> > task.set_upstream(start)
> >
> > for file in glob('dags/snowsql/create/view/*.sql'):
> > print("FILE {}".format(file))
> > task = create_snowflake_operator(file, dag, 'snowflake_default')
> > task.set_upstream(start)
> >
> > print("done {}".format(start.downstream_task_ids))
> >
> > Thanks in advance
> > --
> > Frank Maritato
> >
>
>
>


Re: KubernetesPodOperator: Invalid arguments were passed to BaseOperator

2018-05-30 Thread Chris Palmer
In the example

it imports BaseOperator as KubernetesPodOperator, when the kubernetes
modules can't be found.

On Wed, May 30, 2018 at 3:34 PM, Craig Rodrigues 
wrote:

> For this particular DeprecationWarning, this problem is not caused
> by dependencies on kubernetes stuff.
>
> On this line:
> https://github.com/apache/incubator-airflow/blob/master/
> airflow/contrib/operators/kubernetes_pod_operator.py#L137
>
> The __init__() method for KubernetesPodOperator is calling the __init__()
> method
> for BaseOperator, and passing in values for kwargs that BaseOperator
> doesn't accept:
>
> name': 'airflow-test-pod',
> 'image': 'ubuntu:16.04',
> 'labels': {'foo': 'bar'},
> 'namespace': 'default',
> 'cmds': ['bash', '-cx'],
> 'arguments': ['echo', '10'],
> 'in_cluster': False,
> 'get_logs': True
>
> I don't understand how these things are being passed via kwargs?
>
> --
> Craig
>
>
> On Wed, May 30, 2018 at 1:40 AM Driesprong, Fokko 
> wrote:
>
> > Hi Craig,
> >
> > This is something that needs to be fixed. I agree with you this is very
> > dirty. In your installation you're not installing the kubernetes stuff,
> so
> > the KubernetesPodOperator is ignored. We need to figure out how to have
> > example dags that are not compatible with the vanilla installation, or we
> > need to remove the kubernetes example for now, and move it to the
> > documentation.
> >
> > Cheers, Fokko
> >
> >
>


Re: 答复: How to know the DAG is starting to run

2018-05-11 Thread Chris Palmer
It's not even clear to me what it means for a DAG to start running. The
creation of a DagRun for a specific execution date is completely
independent of the scheduling of any TaskInstances for that DagRun. There
could be a significant delay between those two events, either deliberately
encoded into the DAG or due to resource constraints.

What event are you actually interested in knowing about? The creation of a
DagRun? The starting of any task for a DagRun? Something else?

Maybe if you provided more details on what exactly the "pipeline
environment setup" you are trying to do, it would help others understand
the problem you are trying to solve.

Chris

On Fri, May 11, 2018 at 10:59 AM, Song Liu  wrote:

> Overriding the "DAG.run" sounds like a workaround, so that if it's running
> a first operation of DAG then do some setup etc.
>
> 
> 发件人: Victor Noagbodji 
> 发送时间: 2018年5月11日 12:50
> 收件人: dev@airflow.incubator.apache.org
> 主题: Re: How to know the DAG is starting to run
>
> Hey,
>
> I don't know if airflow has a concept of DAG-level events or callbacks.
> (Operators do have callbacks though.). You might get away with subclassing
> the DAG class or having a class decorator.
>
> The source suggests that ".run()" is the method you want to override. You
> may want to call the original "super().run()" then do what you need to do
> afterwards.
>
> Let's see if that works for you.
>
> > On May 11, 2018, at 8:26 AM, Song Liu  wrote:
> >
> > Yes, I have though this approach, but more elegant way is doing in the
> DAG since we don't want to add this "pipeline environment setup" as a
> single operator, which should be done in the DAG more gracefully.
> > 
> > 发件人: James Meickle 
> > 发送时间: 2018年5月11日 12:09
> > 收件人: dev@airflow.incubator.apache.org
> > 主题: Re: How to know the DAG is starting to run
> >
> > Song:
> >
> > You can put an operator as the very first node in the DAG, and have
> > everything else in the DAG depend on it. For example, this is the
> approach
> > we use to only execute DAG tasks on stock market trading days.
> >
> > -James M.
> >
> > On Fri, May 11, 2018 at 3:57 AM, Song Liu  wrote:
> >
> >> Hi,
> >>
> >> I have something just want to be done only once when DAG is constructed,
> >> but it seems that DAG will be instanced every time when run each of
> >> operator.
> >>
> >> So is that there function in DAG that tell us it is starting to run now
> ?
> >>
> >> Thanks,
> >> Song
> >>
>
>


Re: rename DAG, and keep the history

2018-03-07 Thread Chris Palmer
You'd have to connect to the database storing all the Airflow metadata,
find all the tables with a 'dag_id' column and update all the rows for the
old name to match the new name.

Chris

On Wed, Mar 7, 2018 at 2:30 PM, Michael Gong 
wrote:

> hi, all,
> due to some reasons, we need rename a dag.
> is it possible to keep the dag run history which used the old name?
>
> thanks.
> michael
>


Re: How to add hooks for strong deployment consistency?

2018-02-28 Thread Chris Palmer
I'll preface this with the fact that I'm relatively new to Airflow, and
haven't played around with a lot of the internals.

I find the idea of a DagFetcher interesting but would we worry about
slowing down the scheduler significantly? If the scheduler is having to
"fetch" multiple different DAG versions, be it git refs or artifacts from
Artifactory, we are talking about adding significant time to each scheduler
run. Also how would the scheduler know which DAGs to fetch from where if
there aren't local files on disk listing those DAGs? Maybe I'm missing
something in the implementation.

It seems to me that the fetching of the different versions should be
delegated to the Task (or TaskInstance) itself. That ensures we only spend
the time to "fetch" the version that is needed when it is needed. One down
side might be that each TaskInstance running for the same version of the
DAG might end up doing the "fetch" independently (duplicating that work).

I think this could be done by adding some version attribute to the DagRun
that gets set at creation, and have the scheduler pass that version to the
TaskInstances when they are created. You could even extend this so that you
could have an arbitrary set of "executor_parameters" that get set on a
DagRun and are passed to TaskInstances. Then the specific Executor class
that is running that TaskInstance could handle the "executor_parameters" as
it sees fit.

One thing I'm not clear on is how and when TaskInstances are created. When
the scheduler first sees a specific DagRun do all the TaskInstances get
created immediately, but only some of them get queued? Or does the
scheduler only create those TaskInstances which can be queued right now?

In particular if a DagRun gets created and while it is running the DAG is
updated and a new Task is added, will the scheduler pick up that new Task
for the running DagRun? If the answer is yes, then my suggestion above
would run the risk of scheduling a Task for a DAG version where that Task
didn't exist. I'm sure you could handle that somewhat gracefully but it's a
bit ugly.

Chris

On Wed, Feb 28, 2018 at 2:05 AM, Maxime Beauchemin <
maximebeauche...@gmail.com> wrote:

> At a higher level I want to say a few things about the idea of enforcing
> version consistency within a DagRun.
>
> One thing we've been talking about is the need for a "DagFetcher"
> abstraction, where it's first implementation that would replace and mimic
> the current one would be "FileSystemDagFetcher". One specific DagFetcher
> implementation may or may not support version semantics, but if it does
> should be able to receive a version id and return the proper version of the
> DAG object. For instance that first "FileSystemDagFetcher" would not
> support version semantic, but perhaps a "GitRepoDagFetcher" would, or an
> "ArtifactoryDagFetcher", or "TarballInS3DagFetcher" may as well.
>
> Of course that assumes that the scheduler knows and stores the active
> version number when generating a new DagRun, and for that information to be
> leveraged on subsequent scheduler cycles and on workers when task are
> executed.
>
> This could also enable things like "remote" backfills (non local,
> parallelized) of a DAG definition that's on an arbitrary git ref (assuming
> a "GitRepoDagFetcher").
>
> There are [perhaps] unintuitive implications where clearing a single task
> would then re-run the old DAG definition on that task (since the version
> was stamped in the DagRun and hasn't changed), but deleting/recreating a
> DagRun would run the latest version (or any other version that may be
> specified for that matter).
>
> I'm unclear on how much work that represents exactly, but it's certainly
> doable and may only require to change part of the DagBag class and a few
> other places.
>
> Max
>
> On Tue, Feb 27, 2018 at 6:48 PM, David Capwell  wrote:
>
> > Thanks for your feedback!
> >
> > Option 1 is a non-starter for us. The reason is we have DAGs that take 9+
> > hours to run.
> >
> > Option 2 is more where my mind was going, but it's rather large.  How I
> see
> > it you need a MVCC DagBag that's aware of multiple versions (what
> provides
> > version?).  Assuming you can track active dag runs pointing to which
> > versions you know how to cleanup (fine with external).  The pro here is
> you
> > have snapshot isolation for dag_run, con is more bookkeeping and require
> > deploy to work with this (last part may be a good thing though).
> >
> > The only other option I can think of is to lock deploy so the system only
> > picks up new versions when no dag_run holds the lock.  This is flawed for
> > many reasons, but breaks horrible for dag_runs that takes minutes (I
> assume
> > 99% do).
> >
> >
> >
> > On Tue, Feb 27, 2018, 4:50 PM Joy Gao  wrote:
> >
> > > Hi David!
> > >
> > > Thank you for clarifying, I think I understand your concern now. We
> > > currently also work around this by making sure a dag is turned off
> > > when we deploy a new version. We also make sur

Re: Airflow at SREcon?

2018-02-23 Thread Chris Palmer
Not directly on topic to your email, but Fitbit has started using Airflow
for some things. In particular the Data Engineering team, which I'm a
member of, and is based in Boston, is starting to use it for much of or ETL
processes.

Chris

On Fri, Feb 23, 2018 at 3:23 PM, James Meickle 
wrote:

> Quantopian is aiming to switch over to Airflow over the course of this
> year, replacing our existing systems for ingesting financial data. I'll be
> at SREcon Americas this year, potentially with some of my coworkers; we're
> located in Boston, which doesn't seem to have many users yet, so we'd love
> to get to know some other users while we're on the west coast!
>
> It looks like we'll be missing the next Bay Area meetup, but perhaps some
> Airflow users will also be at SREcon?
>
> -James M.
>


Re: Scheduler won't schedule past minimum end_date of tasks

2018-02-22 Thread Chris Palmer
I don't think there is an issue with TIDeps, there is already a
RunnableExecDateDep
<https://github.com/apache/incubator-airflow/blob/master/airflow/ti_deps/deps/runnable_exec_date_dep.py>
that covers that end dates of tasks.

Here is my toy example that I used to test this behavior -
https://pastebin.com/G9naqXhq

Here is a screenshot of the DagRuns:

​
The first two DagRuns are for 2018-02-01 and 2018-02-02, and were created
by the scheduler, and both tasks ran as expected.

The third run is for 2018-02-03 and was created by me running 'airflow
trigger_dag'. As you can see only the first task got run, which is what I
would have expected. 'airflow task_failed_deps' for the second task gives
the expected answer:

$ airflow task_failed_deps testing_end_dates demo_task_2 2018-02-03 -sd
dags/examples
[2018-02-22 15:32:52,966] {__init__.py:57} INFO - Using executor
CeleryExecutor
...
...
Task instance dependencies not met:
Execution Date: The execution date is 2018-02-03T00:00:00 but this is after
the task's end date 2018-02-02T00:00:00.

Although an additional problem is that the DagRun is still listed as
'running', presumably because the second task hasn't completed (even though
it never will). It's not totally clear to me what the state of that second
task instance should be maybe 'no status' is fine, or maybe they could be
marked as 'skipped' automatically.

I think the over all solution would be to:

   - updated the scheduler so that it will create DagRuns if the end date
   of any task is None or in the future
   - figure out what state task instances who's end date is before the
   execution date should be in
   - update DagRun model to ignore task instances with end dates before the
   execution date when updating the DagRun state

The first of those seems pretty easy to me, the second is mostly just
semantics, but the third seems a bit more complicated (after first glance
at that part of the code). I'll file a ticket and maybe find some time to
work on a PR; only started using Airflow recently and would like to start
contributing back to it.

Chris



On Thu, Feb 22, 2018 at 4:56 AM, Ash Berlin-Taylor <
ash_airflowl...@firemirror.com> wrote:

> That does sound like a bug, and I would have expected, as you did, that
> not specifying an end_date on some tasks means those tasks should run for
> ever.
>
> Changes that probably need making is that a task end_date of None on a
> task should me "greater" than other task dates in/around the lines you
> linked to.
>
> Do we need to add a TIDep https://github.com/apache/
> incubator-airflow/tree/master/airflow/ti_deps/deps <
> https://github.com/apache/incubator-airflow/tree/master/
> airflow/ti_deps/deps> to ensure the exec date is less than the task end
> date?
>
> -ash
>
> > On 21 Feb 2018, at 20:58, Chris Palmer  wrote:
> >
> > I was very surprised to find that if you set an end_date on any of the
> > tasks in a DAG, that the scheduler won't create DagRuns after the minimum
> > end_date of tasks. The code that does this is the 6 or so lines starting
> > here -
> > https://github.com/apache/incubator-airflow/blob/master/
> airflow/jobs.py#L867
> > .
> >
> > So if for example I have:
> >
> >   - a DAG with a start_date of 2018-02-01, no specific end_date and a
> >   daily schedule
> >   - One task in that DAG with no specified end_date
> >   - A second task in that DAG with an end_date of 2018-02-02
> >
> > The scheduler will create a DagRuns for 2018-02-01 and 2018-02-02 but
> will
> > not create a DagRun for 2018-02-03 or later.
> >
> > That seems completely counter intuitive to me. I would expect the
> scheduler
> > to keep creating DagRuns so that the first task can keep running.
> >
> >
> > Interestingly, if I manually created a DagRun for 2018-02-03 then the
> > scheduler would then only scheduled the first task for that
> execution_date
> > and actually respects the end_date of the second task.
> >
> > The only alternative to adding an end_date to a task is to edit the DAG
> and
> > remove those tasks from the DAG entirely. However, that means the
> webserver
> > is no longer aware of those tasks and I can't look at the historical
> > behavior in the UI.
> >
> >
> > Does anyone have explanation for why this logic is there? Is there some
> > necessary use case for that restriction that I'm not thinking about?
> >
> >
> > I could see a similar piece of code that checks to see if all tasks in
> the
> > DAG have specified end_dates and prevents the scheduler from creating
> > DagRuns passed the MAX of those dates. There is no point in creating
> > DagRuns if none of the tasks are going to be run, but as long as at least
> > one task can run for that execution_date I think the scheduler should
> > create it.
> >
> > Thanks
> > Chris
>
>


Scheduler won't schedule past minimum end_date of tasks

2018-02-21 Thread Chris Palmer
I was very surprised to find that if you set an end_date on any of the
tasks in a DAG, that the scheduler won't create DagRuns after the minimum
end_date of tasks. The code that does this is the 6 or so lines starting
here -
https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L867
.

So if for example I have:

   - a DAG with a start_date of 2018-02-01, no specific end_date and a
   daily schedule
   - One task in that DAG with no specified end_date
   - A second task in that DAG with an end_date of 2018-02-02

The scheduler will create a DagRuns for 2018-02-01 and 2018-02-02 but will
not create a DagRun for 2018-02-03 or later.

That seems completely counter intuitive to me. I would expect the scheduler
to keep creating DagRuns so that the first task can keep running.


Interestingly, if I manually created a DagRun for 2018-02-03 then the
scheduler would then only scheduled the first task for that execution_date
and actually respects the end_date of the second task.

The only alternative to adding an end_date to a task is to edit the DAG and
remove those tasks from the DAG entirely. However, that means the webserver
is no longer aware of those tasks and I can't look at the historical
behavior in the UI.


Does anyone have explanation for why this logic is there? Is there some
necessary use case for that restriction that I'm not thinking about?


I could see a similar piece of code that checks to see if all tasks in the
DAG have specified end_dates and prevents the scheduler from creating
DagRuns passed the MAX of those dates. There is no point in creating
DagRuns if none of the tasks are going to be run, but as long as at least
one task can run for that execution_date I think the scheduler should
create it.

Thanks
Chris


Re: Close SqlSensor Connection

2018-01-16 Thread Chris Palmer
I'm not sure this is the right solution. I haven't explored all the code
but it seems to me that the actual database connections are managed in the
hooks. Different databases will want to handle this differently, and
modifying SqlSensor seems too heavy handed to me.


If you look at the get_records method of the generic DbApiHook (
https://pythonhosted.org/airflow/_modules/dbapi_hook.html#DbApiHook) then
you can see it does closer the connections it is making.

Is there specific hook for Teradata, and have you looked at how that is
handling connections?

Chris


On Jan 16, 2018 9:32 AM, "Bolke de Bruin"  wrote:

Of course. PR welcome. It would be nice to know how to test for connection
leakage.

Sent from my iPhone

> On 16 Jan 2018, at 16:01, Alexis Rolland 
wrote:
>
> Hello everyone,
>
> I’m reaching out to discuss / suggest a small improvement in the class
SqlSensor:
> https://pythonhosted.org/airflow/_modules/airflow/operators/sensors.html
>
> We are currently using SqlSensors on top of Teradata in several DAGs.
When the DAGs execute we receive the following error message from Teradata
engine: Error 8024 All virtual circuits are currently in use.
> This error message would typically appear when we reach the maximum
number of simultaneous connections to the database.
>
> I am suspecting the SqlSensor task to create a lot of connections -
basically every time it (re)tries, and these connections would end up in
idle state.
> Does closing the connection at the end of the SqlSensor poke method
sounds feasible?
>
> I’d like to take this opportunity as well to thank you for the awesome
work you’ve been doing with Airflow.
> Keep it up!
>
> Best,
>
>
>