[jira] [Created] (AIRFLOW-6528) disable W503 flake8 check (line break before binary operator)

2020-01-09 Thread Daniel Standish (Jira)
Daniel Standish created AIRFLOW-6528:


 Summary: disable W503 flake8 check (line break before binary 
operator)
 Key: AIRFLOW-6528
 URL: https://issues.apache.org/jira/browse/AIRFLOW-6528
 Project: Apache Airflow
  Issue Type: Bug
  Components: pre-commit
Affects Versions: 1.10.7
Reporter: Daniel Standish


Flake8's W503 rule says there should be no line break before binary operator.

This rule is incompatible with black formatter, and is also in my opinion bad 
style.

Status quo example with W503 check enabled:
{code}
@property
def sqlalchemy_scheme(self):
"""
Database provided in init if exists; otherwise, ``schema`` from 
``Connection`` object.
"""
return (
self._sqlalchemy_scheme or
self.connection_extra_lower.get('sqlalchemy_scheme') or
self.DEFAULT_SQLALCHEMY_SCHEME
)
{code}

as required by black (W503 disabled)
{code}
@property
def sqlalchemy_scheme(self):
"""
Database provided in init if exists; otherwise, ``schema`` from 
``Connection`` object.
"""
return (
self._sqlalchemy_scheme
or self.connection_extra_lower.get('sqlalchemy_scheme')
or self.DEFAULT_SQLALCHEMY_SCHEME
)
{code}









--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-6517) make merge_dicts recursive

2020-01-08 Thread Daniel Standish (Jira)
Daniel Standish created AIRFLOW-6517:


 Summary: make merge_dicts recursive
 Key: AIRFLOW-6517
 URL: https://issues.apache.org/jira/browse/AIRFLOW-6517
 Project: Apache Airflow
  Issue Type: Bug
  Components: utils
Affects Versions: 1.10.7
Reporter: Daniel Standish
Assignee: Daniel Standish






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-6398) improve flakey test test_mark_success_no_kill

2019-12-29 Thread Daniel Standish (Jira)
Daniel Standish created AIRFLOW-6398:


 Summary: improve flakey test test_mark_success_no_kill
 Key: AIRFLOW-6398
 URL: https://issues.apache.org/jira/browse/AIRFLOW-6398
 Project: Apache Airflow
  Issue Type: Bug
  Components: tests
Affects Versions: 1.10.7
Reporter: Daniel Standish
Assignee: Daniel Standish


test test_mark_success_no_kill fails regularly

part of the problem is that it depends on timing of a subprocess and database 
operations

we can reduce complexity by using python operator instead of bash operator. 

currently it uses bash operator to call {{sleep 600}}

this introduces an unnecessary layer of multiprocessing.  there is also a bug 
in bash operator which was exacerbating the problem [AIRFLOW-6397]









--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-6397) Check for `sub_process` before trying to get pid in bash operator on kill

2019-12-29 Thread Daniel Standish (Jira)
Daniel Standish created AIRFLOW-6397:


 Summary: Check for `sub_process` before trying to get pid in bash 
operator on kill
 Key: AIRFLOW-6397
 URL: https://issues.apache.org/jira/browse/AIRFLOW-6397
 Project: Apache Airflow
  Issue Type: Bug
  Components: operators
Affects Versions: 1.10.7
Reporter: Daniel Standish
Assignee: Daniel Standish


test {{test_mark_success_no_kill}} in {{TestLocalTaskJob}} is very flakey

i found that one reason is that the test may attempt to kill the task before 
the subprocess is created and stored as an attribute

For example:
{code}

[2019-12-29 15:03:51,963] {bash_operator.py:116} INFO - Running command: sleep 
600
[2019-12-29 15:03:51,967] {helpers.py:315} INFO - Sending Signals.SIGTERM to 
GPID 57093
[2019-12-29 15:03:51,968] {taskinstance.py:913} ERROR - Received SIGTERM. 
Terminating subprocesses.
[2019-12-29 15:03:51,970] {bash_operator.py:143} INFO - Sending SIGTERM signal 
to bash process group
[2019-12-29 15:03:51,970] {taskinstance.py:913} ERROR - Received SIGTERM. 
Terminating subprocesses.
[2019-12-29 15:03:51,982] {bash_operator.py:143} INFO - Sending SIGTERM signal 
to bash process group
[2019-12-29 15:03:51,982] {taskinstance.py:1078} ERROR - 'BashOperator' object 
has no attribute 'sub_process'
Traceback (most recent call last):
  File "/Users/dstandish/code/airflow/airflow/models/taskinstance.py", line 
945, in _run_raw_task
result = task_copy.execute(context=context)
  File "/Users/dstandish/code/airflow/airflow/operators/bash_operator.py", line 
123, in execute
preexec_fn=pre_exec)
  File "/Users/dstandish/.pyenv/versions/3.7.5/lib/python3.7/subprocess.py", 
line 800, in __init__
restore_signals, start_new_session)
  File "/Users/dstandish/.pyenv/versions/3.7.5/lib/python3.7/subprocess.py", 
line 1505, in _execute_child
part = os.read(errpipe_read, 5)
  File "/Users/dstandish/code/airflow/airflow/models/taskinstance.py", line 
914, in signal_handler
task_copy.on_kill()
  File "/Users/dstandish/code/airflow/airflow/operators/bash_operator.py", line 
144, in on_kill
os.killpg(os.getpgid(self.sub_process.pid), signal.SIGTERM)
AttributeError: 'BashOperator' object has no attribute 'sub_process'
[2019-12-29 15:03:51,988] {taskinstance.py:1123} INFO - Marking task as 
FAILED.dag_id=test_mark_success, task_id=task1, execution_date=20160101T00, 
start_date=20191229T230351, end_date=20191229T230351
[2019-12-29 15:03:52,022] {helpers.py:281} INFO - Process 
psutil.Process(pid=57093, status='terminated') (57093) terminated with exit 
code 1
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-6296) add mssql odbc hook

2019-12-18 Thread Daniel Standish (Jira)
Daniel Standish created AIRFLOW-6296:


 Summary: add mssql odbc hook
 Key: AIRFLOW-6296
 URL: https://issues.apache.org/jira/browse/AIRFLOW-6296
 Project: Apache Airflow
  Issue Type: Bug
  Components: hooks
Affects Versions: 1.10.7
Reporter: Daniel Standish
Assignee: Daniel Standish






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-6254) obscure conn extra in logs

2019-12-13 Thread Daniel Standish (Jira)
Daniel Standish created AIRFLOW-6254:


 Summary: obscure conn extra in logs
 Key: AIRFLOW-6254
 URL: https://issues.apache.org/jira/browse/AIRFLOW-6254
 Project: Apache Airflow
  Issue Type: Bug
  Components: core
Affects Versions: 1.10.6
Reporter: Daniel Standish
Assignee: Daniel Standish
 Fix For: 1.10.7


When {{BaseHook.get_connection}} is called, it calls {{conn.log_info()}} on the 
returned {{conn}} object.

This is prints to log the full contents of {{conn.extra}}.

This is problematic because there can be sensitive information in 
{{conn.extra}}.

The present change resolves this by adding method {{conn.log_info}} which 
obscures {{extra}}, and calling that in {{get_connection}} instead of 
{{debug_info}}.  

The {{debug_info}} method itself is left unchanged.






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-5793) add test for multiple alembic revision heads

2019-10-27 Thread Daniel Standish (Jira)
Daniel Standish created AIRFLOW-5793:


 Summary: add test for multiple alembic revision heads
 Key: AIRFLOW-5793
 URL: https://issues.apache.org/jira/browse/AIRFLOW-5793
 Project: Apache Airflow
  Issue Type: Improvement
  Components: tests
Affects Versions: 1.10.5
Reporter: Daniel Standish
Assignee: Daniel Standish


Depending on the timing of merges with migrations, we can end up with two 
revision heads that need to be merged.

This adds a test to detect when multiple heads are present.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-5768) Google Cloud SQL - Don't store ephemeral connection object to database

2019-10-26 Thread Daniel Standish (Jira)
Daniel Standish created AIRFLOW-5768:


 Summary: Google Cloud SQL - Don't store ephemeral connection 
object to database
 Key: AIRFLOW-5768
 URL: https://issues.apache.org/jira/browse/AIRFLOW-5768
 Project: Apache Airflow
  Issue Type: Improvement
  Components: gcp
Affects Versions: 1.10.5
Reporter: Daniel Standish
Assignee: Daniel Standish


GCP cloud sql operator creates dynamically an ephemeral Connection object.  It 
persists to metastore during execution and deletes afterward.  

This behavior has negative impact on our ability to refactor creds management.  

By not persisting to database, we can also remove some complexity re ensuring 
connection is deleted in event of failure, and the tests that go along with 
that.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-5768) Google Cloud SQL - Don't store ephemeral connection object to database

2019-10-26 Thread Daniel Standish (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish updated AIRFLOW-5768:
-
Description: 
GCP cloud sql operator creates dynamically an ephemeral Connection object.  It 
persists to metastore during execution and deletes afterward.  

This behavior has negative impact on our ability to refactor creds management.  

By not persisting to database, we can also remove some complexity re ensuring 
connection is deleted in event of failure, and the tests that go along with 
that.

It does require that we add optional param `connection` to both MySqlHook and 
PostgresHook.


  was:
GCP cloud sql operator creates dynamically an ephemeral Connection object.  It 
persists to metastore during execution and deletes afterward.  

This behavior has negative impact on our ability to refactor creds management.  

By not persisting to database, we can also remove some complexity re ensuring 
connection is deleted in event of failure, and the tests that go along with 
that.


> Google Cloud SQL - Don't store ephemeral connection object to database
> --
>
> Key: AIRFLOW-5768
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5768
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: gcp
>Affects Versions: 1.10.5
>Reporter: Daniel Standish
>Assignee: Daniel Standish
>Priority: Major
>
> GCP cloud sql operator creates dynamically an ephemeral Connection object.  
> It persists to metastore during execution and deletes afterward.  
> This behavior has negative impact on our ability to refactor creds 
> management.  
> By not persisting to database, we can also remove some complexity re ensuring 
> connection is deleted in event of failure, and the tests that go along with 
> that.
> It does require that we add optional param `connection` to both MySqlHook and 
> PostgresHook.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-5768) Google Cloud SQL - Don't store ephemeral connection object to database

2019-10-26 Thread Daniel Standish (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish updated AIRFLOW-5768:
-
Description: 
GCP cloud sql operator creates dynamically an ephemeral Connection object.  It 
persists to metastore during execution and deletes afterward.  

This behavior has negative impact on our ability to refactor creds management.  

By not persisting to database, we can also remove some complexity re ensuring 
connection is deleted in event of failure, and the tests that go along with 
that.

This change requires that we add optional param `connection` to both MySqlHook 
and PostgresHook.


  was:
GCP cloud sql operator creates dynamically an ephemeral Connection object.  It 
persists to metastore during execution and deletes afterward.  

This behavior has negative impact on our ability to refactor creds management.  

By not persisting to database, we can also remove some complexity re ensuring 
connection is deleted in event of failure, and the tests that go along with 
that.

It does require that we add optional param `connection` to both MySqlHook and 
PostgresHook.



> Google Cloud SQL - Don't store ephemeral connection object to database
> --
>
> Key: AIRFLOW-5768
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5768
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: gcp
>Affects Versions: 1.10.5
>Reporter: Daniel Standish
>Assignee: Daniel Standish
>Priority: Major
>
> GCP cloud sql operator creates dynamically an ephemeral Connection object.  
> It persists to metastore during execution and deletes afterward.  
> This behavior has negative impact on our ability to refactor creds 
> management.  
> By not persisting to database, we can also remove some complexity re ensuring 
> connection is deleted in event of failure, and the tests that go along with 
> that.
> This change requires that we add optional param `connection` to both 
> MySqlHook and PostgresHook.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-5751) add get_uri method to Connection object

2019-10-24 Thread Daniel Standish (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish updated AIRFLOW-5751:
-
Description: 
Airflow can either use connections stored in database stored in URI form in 
environment variables.

We can add a convenience method `get_uri` on `Connection` object to generate 
the URI for a connection.  

This can help users because sometimes it is a little tricky / not obvious how 
to generate the URI format.

I think it could also be nice if each hook had a `get_uri` method that would 
take all relevant params and produce a correctly encoded URI.  If that were 
implemented, it could use this function for that purpose. 



  was:
Airflow can either use connections stored in database stored in URI form in 
environment variables.

We can add a convenience method `get_uri` on `Connection` object to generate 
the URI for a connection.  

This can help users because sometimes it is a little tricky / not obvious how 
to generate the URI format.


> add get_uri method to Connection object
> ---
>
> Key: AIRFLOW-5751
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5751
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: core
>Affects Versions: 1.10.5
>Reporter: Daniel Standish
>Assignee: Daniel Standish
>Priority: Major
>
> Airflow can either use connections stored in database stored in URI form in 
> environment variables.
> We can add a convenience method `get_uri` on `Connection` object to generate 
> the URI for a connection.  
> This can help users because sometimes it is a little tricky / not obvious how 
> to generate the URI format.
> I think it could also be nice if each hook had a `get_uri` method that would 
> take all relevant params and produce a correctly encoded URI.  If that were 
> implemented, it could use this function for that purpose. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-5752) flakey slack build message test

2019-10-24 Thread Daniel Standish (Jira)
Daniel Standish created AIRFLOW-5752:


 Summary: flakey slack build message test
 Key: AIRFLOW-5752
 URL: https://issues.apache.org/jira/browse/AIRFLOW-5752
 Project: Apache Airflow
  Issue Type: Test
  Components: tests
Affects Versions: 1.10.5
Reporter: Daniel Standish


on python 3.5 test test_build_slack_message fails sometimes because of 
indeterminate ordering of dict

we can resolve this by loading json message to dict and using `assertDictEqual`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-5751) add get_uri method to Connection object

2019-10-24 Thread Daniel Standish (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish updated AIRFLOW-5751:
-
Summary: add get_uri method to Connection object  (was: add get_uri method 
to connections object)

> add get_uri method to Connection object
> ---
>
> Key: AIRFLOW-5751
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5751
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: core
>Affects Versions: 1.10.5
>Reporter: Daniel Standish
>Assignee: Daniel Standish
>Priority: Major
>
> Airflow can either use connections stored in database stored in URI form in 
> environment variables.
> We can add a convenience method `get_uri` on `Connection` object to generate 
> the URI for a connection.  
> This can help users because sometimes it is a little tricky / not obvious how 
> to generate the URI format.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-5751) add get_uri method to connections object

2019-10-24 Thread Daniel Standish (Jira)
Daniel Standish created AIRFLOW-5751:


 Summary: add get_uri method to connections object
 Key: AIRFLOW-5751
 URL: https://issues.apache.org/jira/browse/AIRFLOW-5751
 Project: Apache Airflow
  Issue Type: New Feature
  Components: core
Affects Versions: 1.10.5
Reporter: Daniel Standish
Assignee: Daniel Standish


Airflow can either use connections stored in database stored in URI form in 
environment variables.

We can add a convenience method `get_uri` on `Connection` object to generate 
the URI for a connection.  

This can help users because sometimes it is a little tricky / not obvious how 
to generate the URI format.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-5720) don't call _get_connections_from_db in TestCloudSqlDatabaseHook

2019-10-22 Thread Daniel Standish (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish updated AIRFLOW-5720:
-
Description: 
Issues with this test class:

*tests are mocking lower-level than they need to*
Tests were mocking {{airflow.hook.BaseHook.get_connections}}.
Instead they can mock 
{{airflow.gcp.hooks.cloud_sql.CloudSqlDatabaseHook.get_connection}} which is 
more direct.

*should not reference private method*

This is an impediment to refactoring of connections / creds.

*Tests had complexity that did not add a benefit*

They all had this bit:
{code:python}
self._setup_connections(get_connections, uri)
gcp_conn_id = 'google_cloud_default'
hook = CloudSqlDatabaseHook(

default_gcp_project_id=BaseHook.get_connection(gcp_conn_id).extra_dejson.get(
'extra__google_cloud_platform__project')
)
{code}

{{_setup_connections}} was like this:
{code:python}
@staticmethod
def _setup_connections(get_connections, uri):
gcp_connection = mock.MagicMock()
gcp_connection.extra_dejson = mock.MagicMock()
gcp_connection.extra_dejson.get.return_value = 'empty_project'
cloudsql_connection = Connection()
cloudsql_connection.parse_from_uri(uri)
cloudsql_connection2 = Connection()
cloudsql_connection2.parse_from_uri(uri)
get_connections.side_effect = [[gcp_connection], [cloudsql_connection],
   [cloudsql_connection2]]
{code}

Issues here are as follows.

1. no test ever used the third side effect

2. the first side effect does not help us; {{default_gcp_project_id}} is 
irrelevant

Only one of the three connections in {{_setup_connections}} has any impact on 
the test.

The call of {{BaseHook.get_connection}} only serves to discard the first 
connection in mock side effect list, {{gcp_connection}}.  

The second connection is the one that matters, and it is returned when 
{{CloudSqlDatabaseHook}} calls `self.get_connection` during init.  Since it is 
a mock side effect, it doesn't matter what value is given for conn_id.  So the 
{{CloudSqlDatabaseHook}} init param {{default_gcp_project_id}} has no 
consequence.  And because it has no consequence, we should not supply a value  
for it because this is misleading.

We should not have extra code that does not serve a purpose because it makes it 
harder to understand what's actually happening.



  was:
Issues with this test class:

*tests are mocking lower-level than they need to*
Tests were mocking {{airflow.hook.BaseHook.get_connections}}.
Instead they can mock 
{{airflow.gcp.hooks.cloud_sql.CloudSqlDatabaseHook.get_connection}} which is 
more direct.

*should not reference private method*

This is an impediment to refactoring of connections / creds.

*Tests had complexity that did not add a benefit*

They all had this bit:
{code:python}
self._setup_connections(get_connections, uri)
gcp_conn_id = 'google_cloud_default'
hook = CloudSqlDatabaseHook(

default_gcp_project_id=BaseHook.get_connection(gcp_conn_id).extra_dejson.get(
'extra__google_cloud_platform__project')
)
{code}

{{_setup_connections}} was like this:
{code:python}
@staticmethod
def _setup_connections(get_connections, uri):
gcp_connection = mock.MagicMock()
gcp_connection.extra_dejson = mock.MagicMock()
gcp_connection.extra_dejson.get.return_value = 'empty_project'
cloudsql_connection = Connection()
cloudsql_connection.parse_from_uri(uri)
cloudsql_connection2 = Connection()
cloudsql_connection2.parse_from_uri(uri)
get_connections.side_effect = [[gcp_connection], [cloudsql_connection],
   [cloudsql_connection2]]
{code}

Issues here are as follows.

1. no test ever used the third side effect

2. the first side effect does not help us; {{default_gcp_project_id}} is 
irrelevant

Only one of the three connections in {{_setup_connections}} has any impact on 
the test.

The call of {{BaseHook.get_connection}} only serves to discard the first 
connection in mock side effect list, {{gcp_connection}}.  

The second connection is the one that matters, and it is returned when 
{{CloudSqlDatabaseHook}} calls `self.get_connection` during init.  Since it is 
a mock side effect, it doesn't matter what value is given for conn_id.  So the 
{{CloudSqlDatabaseHook}} init param {{default_gcp_project_id}} has no 
consequence.  And because it has no consequence, we should not supply a value  
for it because this is misleading.






> don't call _get_connections_from_db in TestCloudSqlDatabaseHook
> ---
>
> Key: AIRFLOW-5720
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5720
> Project: Apache Airflow
> 

[jira] [Updated] (AIRFLOW-5720) don't call _get_connections_from_db in TestCloudSqlDatabaseHook

2019-10-22 Thread Daniel Standish (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish updated AIRFLOW-5720:
-
Description: 
Issues with this test class:

*tests are mocking lower-level than they need to*
Tests were mocking {{airflow.hook.BaseHook.get_connections}}.
Instead they can mock 
{{airflow.gcp.hooks.cloud_sql.CloudSqlDatabaseHook.get_connection}} which is 
more direct.

*should not reference private method*

This is an impediment to refactoring of connections / creds.

*Tests had complexity that did not add a benefit*

They all had this bit:
{code:python}
self._setup_connections(get_connections, uri)
gcp_conn_id = 'google_cloud_default'
hook = CloudSqlDatabaseHook(

default_gcp_project_id=BaseHook.get_connection(gcp_conn_id).extra_dejson.get(
'extra__google_cloud_platform__project')
)
{code}

{{_setup_connections}} was like this:
{code:python}
@staticmethod
def _setup_connections(get_connections, uri):
gcp_connection = mock.MagicMock()
gcp_connection.extra_dejson = mock.MagicMock()
gcp_connection.extra_dejson.get.return_value = 'empty_project'
cloudsql_connection = Connection()
cloudsql_connection.parse_from_uri(uri)
cloudsql_connection2 = Connection()
cloudsql_connection2.parse_from_uri(uri)
get_connections.side_effect = [[gcp_connection], [cloudsql_connection],
   [cloudsql_connection2]]
{code}

Issues here are as follows.

1. no test ever used the third side effect

2. the first side effect does not help us; {{default_gcp_project_id}} is 
irrelevant

Only one of the three connections in {{_setup_connections}} has any impact on 
the test.

The call of {{BaseHook.get_connection}} only serves to discard the first 
connection in mock side effect list, {{gcp_connection}}.  

The second connection is the one that matters, and it is returned when 
{{CloudSqlDatabaseHook}} calls `self.get_connection` during init.  Since it is 
a mock side effect, it doesn't matter what value is given for conn_id.  So the 
{{CloudSqlDatabaseHook}} init param {{default_gcp_project_id}} has no 
consequence.  And because it has no consequence, we should not supply a value  
for it because this is misleading.





  was:
Issues with this test class:

*tests are mocking lower-level than they need to*
Tests were mocking {{airflow.hook.BaseHook.get_connections}}.
Instead they can mock 
{{airflow.gcp.hooks.cloud_sql.CloudSqlDatabaseHook.get_connection}} which is 
more direct.

*should not reference private method*

This is an impediment to refactoring of connections / creds.

*Tests had complexity that did not add a benefit*

They all had this bit:
{code:python}
self._setup_connections(get_connections, uri)
gcp_conn_id = 'google_cloud_default'
hook = CloudSqlDatabaseHook(

default_gcp_project_id=BaseHook.get_connection(gcp_conn_id).extra_dejson.get(
'extra__google_cloud_platform__project')
)
{code}

{{_setup_connections}} was like this:
{code:python}
@staticmethod
def _setup_connections(get_connections, uri):
gcp_connection = mock.MagicMock()
gcp_connection.extra_dejson = mock.MagicMock()
gcp_connection.extra_dejson.get.return_value = 'empty_project'
cloudsql_connection = Connection()
cloudsql_connection.parse_from_uri(uri)
cloudsql_connection2 = Connection()
cloudsql_connection2.parse_from_uri(uri)
get_connections.side_effect = [[gcp_connection], [cloudsql_connection],
   [cloudsql_connection2]]
{code}

Issues here are as follows.

1. no test ever used the third side effect

2. the first side effect does not help us; {{default_gcp_project_id}} is 
irrelevant.

All this line serves to accomplish is to discard the first connection, 
{{gcp_connection}}.  This is the first invocation of 
{{BaseHook.get_connection}}.  

The second invocation is the one that matters, namely when 
{{CloudSqlDatabaseHook}} calls `self.get_connection`.

This is when {{cloudsql_connection}} is returned.  But since it is a mock side 
effect, it doesn't matter what value you give for conn_id.  So the param 
{{default_gcp_project_id}} has no consequence.  And because it has no 
consequence, we should not supply a value  for it because this is misleading.






> don't call _get_connections_from_db in TestCloudSqlDatabaseHook
> ---
>
> Key: AIRFLOW-5720
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5720
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: gcp
>Affects Versions: 1.10.5
>Reporter: Daniel Standish
>Assignee: Daniel Standish
>Priority: 

[jira] [Updated] (AIRFLOW-5720) don't call _get_connections_from_db in TestCloudSqlDatabaseHook

2019-10-22 Thread Daniel Standish (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish updated AIRFLOW-5720:
-
Description: 
Issues with this test class:

*tests are mocking lower-level than they need to*
Tests were mocking {{airflow.hook.BaseHook.get_connections}}.
Instead they can mock 
{{airflow.gcp.hooks.cloud_sql.CloudSqlDatabaseHook.get_connection}} which is 
more direct.

*should not reference private method*

This is an impediment to refactoring of connections / creds.

*Tests had complexity that did not add a benefit*

They all had this bit:
{code:python}
self._setup_connections(get_connections, uri)
gcp_conn_id = 'google_cloud_default'
hook = CloudSqlDatabaseHook(

default_gcp_project_id=BaseHook.get_connection(gcp_conn_id).extra_dejson.get(
'extra__google_cloud_platform__project')
)
{code}

{{_setup_connections}} was like this:
{code:python}
@staticmethod
def _setup_connections(get_connections, uri):
gcp_connection = mock.MagicMock()
gcp_connection.extra_dejson = mock.MagicMock()
gcp_connection.extra_dejson.get.return_value = 'empty_project'
cloudsql_connection = Connection()
cloudsql_connection.parse_from_uri(uri)
cloudsql_connection2 = Connection()
cloudsql_connection2.parse_from_uri(uri)
get_connections.side_effect = [[gcp_connection], [cloudsql_connection],
   [cloudsql_connection2]]
{code}

Issues here are as follows.

1. no test ever used the third side effect

2. the first side effect does not help us; {{default_gcp_project_id}} is 
irrelevant.

All this line serves to accomplish is to discard the first connection, 
{{gcp_connection}}.  This is the first invocation of 
{{BaseHook.get_connection}}.  

The second invocation is the one that matters, namely when 
{{CloudSqlDatabaseHook}} calls `self.get_connection`.

This is when {{cloudsql_connection}} is returned.  But since it is a mock side 
effect, it doesn't matter what value you give for conn_id.  So the param 
{{default_gcp_project_id}} has no consequence.  And because it has no 
consequence, we should not supply a value  for it because this is misleading.





  was:
Issues with this test class:

*tests are mocking lower-level than they need to*
Tests were mocking {{airflow.hook.BaseHook.get_connections}}.
Instead they can mock 
{{airflow.gcp.hooks.cloud_sql.CloudSqlDatabaseHook.get_connection}} which is 
more direct.

*should not reference private method*

This is an impediment to refactoring of connections / creds.

*Tests had complexity that did not add a benefit*

They all had this bit:
{code:python}
self._setup_connections(get_connections, uri)
gcp_conn_id = 'google_cloud_default'
hook = CloudSqlDatabaseHook(

default_gcp_project_id=BaseHook.get_connection(gcp_conn_id).extra_dejson.get(
'extra__google_cloud_platform__project')
)
{code}

{{_setup_connections}} was like this:
{code:python}
@staticmethod
def _setup_connections(get_connections, uri):
gcp_connection = mock.MagicMock()
gcp_connection.extra_dejson = mock.MagicMock()
gcp_connection.extra_dejson.get.return_value = 'empty_project'
cloudsql_connection = Connection()
cloudsql_connection.parse_from_uri(uri)
cloudsql_connection2 = Connection()
cloudsql_connection2.parse_from_uri(uri)
get_connections.side_effect = [[gcp_connection], [cloudsql_connection],
   [cloudsql_connection2]]
{code}

Issues here are as follows.

1. no test ever used the third side effect

2. this line: 
{{default_gcp_project_id=BaseHook.get_connection(gcp_conn_id).extra_dejson.get(}}
 

All this line serves to accomplish is to discard the first connection, 
{{gcp_connection}}.  This is the first invocation of 
{{BaseHook.get_connection}}.  

The second invocation is the one that matters, namely when 
{{CloudSqlDatabaseHook}} calls `self.get_connection`.

This is when {{cloudsql_connection}} is returned.  But since it is a mock side 
effect, it doesn't matter what value you give for conn_id.  So the param 
{{default_gcp_project_id}} has no consequence.  And because it has no 
consequence, we should not supply a value  for it because this is misleading.






> don't call _get_connections_from_db in TestCloudSqlDatabaseHook
> ---
>
> Key: AIRFLOW-5720
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5720
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: gcp
>Affects Versions: 1.10.5
>Reporter: Daniel Standish
>Assignee: Daniel Standish
>Priority: Major
>
> Issues with this test class:
> *tests are 

[jira] [Updated] (AIRFLOW-5720) don't call _get_connections_from_db in TestCloudSqlDatabaseHook

2019-10-22 Thread Daniel Standish (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish updated AIRFLOW-5720:
-
Description: 
Issues with this test class:

*tests are mocking lower-level than they need to*
Tests were mocking {{airflow.hook.BaseHook.get_connections}}.
Instead they can mock 
{{airflow.gcp.hooks.cloud_sql.CloudSqlDatabaseHook.get_connection}} which is 
more direct.

*should not reference private method*

This is an impediment to refactoring of connections / creds.

*Tests had complexity that did not add a benefit*

They all had this bit:
{code:python}
self._setup_connections(get_connections, uri)
gcp_conn_id = 'google_cloud_default'
hook = CloudSqlDatabaseHook(

default_gcp_project_id=BaseHook.get_connection(gcp_conn_id).extra_dejson.get(
'extra__google_cloud_platform__project')
)
{code}

{{_setup_connections}} was like this:
{code:python}
@staticmethod
def _setup_connections(get_connections, uri):
gcp_connection = mock.MagicMock()
gcp_connection.extra_dejson = mock.MagicMock()
gcp_connection.extra_dejson.get.return_value = 'empty_project'
cloudsql_connection = Connection()
cloudsql_connection.parse_from_uri(uri)
cloudsql_connection2 = Connection()
cloudsql_connection2.parse_from_uri(uri)
get_connections.side_effect = [[gcp_connection], [cloudsql_connection],
   [cloudsql_connection2]]
{code}

Issues here are as follows.

1. no test ever used the third side effect

2. this line: 
{{default_gcp_project_id=BaseHook.get_connection(gcp_conn_id).extra_dejson.get(}}
 

All this line serves to accomplish is to discard the first connection, 
{{gcp_connection}}.  This is the first invocation of 
{{BaseHook.get_connection}}.  

The second invocation is the one that matters, namely when 
{{CloudSqlDatabaseHook}} calls `self.get_connection`.

This is when {{cloudsql_connection}} is returned.  But since it is a mock side 
effect, it doesn't matter what value you give for conn_id.  So the param 
{{default_gcp_project_id}} has no consequence.  And because it has no 
consequence, we should not supply a value  for it because this is misleading.





  was:
Test should not reference "private" methods.  This impedes refactoring.

Also some other issues with these tests.

More detail TBD






> don't call _get_connections_from_db in TestCloudSqlDatabaseHook
> ---
>
> Key: AIRFLOW-5720
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5720
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: gcp
>Affects Versions: 1.10.5
>Reporter: Daniel Standish
>Assignee: Daniel Standish
>Priority: Major
>
> Issues with this test class:
> *tests are mocking lower-level than they need to*
> Tests were mocking {{airflow.hook.BaseHook.get_connections}}.
> Instead they can mock 
> {{airflow.gcp.hooks.cloud_sql.CloudSqlDatabaseHook.get_connection}} which is 
> more direct.
> *should not reference private method*
> This is an impediment to refactoring of connections / creds.
> *Tests had complexity that did not add a benefit*
> They all had this bit:
> {code:python}
> self._setup_connections(get_connections, uri)
> gcp_conn_id = 'google_cloud_default'
> hook = CloudSqlDatabaseHook(
> 
> default_gcp_project_id=BaseHook.get_connection(gcp_conn_id).extra_dejson.get(
> 'extra__google_cloud_platform__project')
> )
> {code}
> {{_setup_connections}} was like this:
> {code:python}
> @staticmethod
> def _setup_connections(get_connections, uri):
> gcp_connection = mock.MagicMock()
> gcp_connection.extra_dejson = mock.MagicMock()
> gcp_connection.extra_dejson.get.return_value = 'empty_project'
> cloudsql_connection = Connection()
> cloudsql_connection.parse_from_uri(uri)
> cloudsql_connection2 = Connection()
> cloudsql_connection2.parse_from_uri(uri)
> get_connections.side_effect = [[gcp_connection], 
> [cloudsql_connection],
>[cloudsql_connection2]]
> {code}
> Issues here are as follows.
> 1. no test ever used the third side effect
> 2. this line: 
> {{default_gcp_project_id=BaseHook.get_connection(gcp_conn_id).extra_dejson.get(}}
>  
> All this line serves to accomplish is to discard the first connection, 
> {{gcp_connection}}.  This is the first invocation of 
> {{BaseHook.get_connection}}.  
> The second invocation is the one that matters, namely when 
> {{CloudSqlDatabaseHook}} calls `self.get_connection`.
> This is when {{cloudsql_connection}} is returned.  But since it is a mock 
> side effect, it doesn't matter what value you 

[jira] [Created] (AIRFLOW-5720) don't call _get_connections_from_db in TestCloudSqlDatabaseHook

2019-10-22 Thread Daniel Standish (Jira)
Daniel Standish created AIRFLOW-5720:


 Summary: don't call _get_connections_from_db in 
TestCloudSqlDatabaseHook
 Key: AIRFLOW-5720
 URL: https://issues.apache.org/jira/browse/AIRFLOW-5720
 Project: Apache Airflow
  Issue Type: New Feature
  Components: gcp
Affects Versions: 1.10.5
Reporter: Daniel Standish
Assignee: Daniel Standish


Test should not reference "private" methods.  This impedes refactoring.

Also some other issues with these tests.

More detail TBD







--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-5705) add option for alternative creds backend

2019-10-19 Thread Daniel Standish (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish updated AIRFLOW-5705:
-
Description: 
Idea here is to create some kind of generic creds backend that could support 
using other creds stores such as AWS SSM parameter store.



  was:
Idea hear is to create some kind of generic creds backend that could support 
using other creds stores such as AWS SSM parameter store.




> add option for alternative creds backend
> 
>
> Key: AIRFLOW-5705
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5705
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: core
>Affects Versions: 1.10.5
>Reporter: Daniel Standish
>Assignee: Daniel Standish
>Priority: Major
>
> Idea here is to create some kind of generic creds backend that could support 
> using other creds stores such as AWS SSM parameter store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-5705) add option for alternative creds backend

2019-10-19 Thread Daniel Standish (Jira)
Daniel Standish created AIRFLOW-5705:


 Summary: add option for alternative creds backend
 Key: AIRFLOW-5705
 URL: https://issues.apache.org/jira/browse/AIRFLOW-5705
 Project: Apache Airflow
  Issue Type: New Feature
  Components: core
Affects Versions: 1.10.5
Reporter: Daniel Standish
Assignee: Daniel Standish


Idea hear is to create some kind of generic creds backend that could support 
using other creds stores such as AWS SSM parameter store.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-4799) tests using bash operator fail flakily because jinja2 rendering of environment variables

2019-06-15 Thread Daniel Standish (JIRA)
Daniel Standish created AIRFLOW-4799:


 Summary: tests using bash operator fail flakily because jinja2 
rendering of environment variables
 Key: AIRFLOW-4799
 URL: https://issues.apache.org/jira/browse/AIRFLOW-4799
 Project: Apache Airflow
  Issue Type: Bug
  Components: core
Affects Versions: 2.0.0
Reporter: Daniel Standish
Assignee: Daniel Standish


In test_retry_delay in task instance tests we see this:

{code:python}
ti = TI(
task=task, execution_date=timezone.utcnow())

self.assertEqual(ti.try_number, 1)
# first run -- up for retry
run_with_error(ti)
self.assertEqual(ti.state, State.UP_FOR_RETRY)
self.assertEqual(ti.try_number, 2)

# second run -- still up for retry because retry_delay hasn't expired
run_with_error(ti)
self.assertEqual(ti.state, State.UP_FOR_RETRY)

# third run -- failed
time.sleep(3)
run_with_error(ti)
self.assertEqual(ti.state, State.FAILED)
{code}

The same TI is re-run multiple times.

The problem is that in the execute method of BashOperator, the {{env}} 
attribute is modified, and updated to include a copy of the parent bash env.  
Then when this TI is executed another time, it again tries to render {{env}}, 
because it is a templated parameter.  And it will attempt to load every {{.sh}} 
or {{.bash}} file found in `env` as a template.  If one file does not exist, 
test will fail with template not found error.

The modifications of env in execute method should be local to execute method. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (AIRFLOW-4798) tests may reference dagbag objects that do not yet exist

2019-06-15 Thread Daniel Standish (JIRA)
Daniel Standish created AIRFLOW-4798:


 Summary: tests may reference dagbag objects that do not yet exist
 Key: AIRFLOW-4798
 URL: https://issues.apache.org/jira/browse/AIRFLOW-4798
 Project: Apache Airflow
  Issue Type: Bug
  Components: core
Affects Versions: 2.0.0
Reporter: Daniel Standish
Assignee: Daniel Standish


For example TaskInstance test `test_depends_on_past` tries to grab a dag 
'test_depends_on_past' but it is not there.  It is created in 
`test_scheduler_job.py` so if you don't run that before task instance tests you 
will get failure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (AIRFLOW-4788) prev_execution_date is not always pendulum.datetime class

2019-06-12 Thread Daniel Standish (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish updated AIRFLOW-4788:
-
Description: 
Despite documentation on macros page, previous execution dates are in general 
not pendulum type.

For one, when reading from database, UtcDateTime returns native datetime type.

Also dag.previous_schedule returns datetime type.

So, in general, `prev_execution_date` and `ti.previous_ti.execution_date` may 
be non-pendulum. 

(there are edge cases when the context var prev_* is pendulum e.g. when there 
is no DR or no schedule interval or manually triggered, but in general, no.)

The problem is, this leads to errors and confusion when using these fields in 
templating, when you expect it to be pendulum but it isn't.

There are a few things to consider:
 # make UtcDateTime sqlalchemy type return pendulum
 # make execution date a property of TaskInstance with appropriate getter 
returning pendulum.
 # Change dag.previous_schedule to return pendulum

 

  was:
In certain circumstances previous execution dates may not be pendulum type.

For one, when reading from database, UtcDateTime returns native datetime type.

Also dag.previous_schedule returns datetime type.

So, depending on circumstances, `prev_execution_date` and 
`ti.previous_ti.execution_date` may be non-pendulum. 

The problem is, this leads to errors and confusion when using these fields in 
templating, when you expect it to be pendulum but it isn't.

There are a few things to consider:
 # make UtcDateTime sqlalchemy type return pendulum
 # make execution date a property of TaskInstance with appropriate getter 
returning pendulum.
 # Change dag.previous_schedule to return pendulum

 


> prev_execution_date is not always pendulum.datetime class
> -
>
> Key: AIRFLOW-4788
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4788
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.10.3
>Reporter: Daniel Standish
>Priority: Major
>
> Despite documentation on macros page, previous execution dates are in general 
> not pendulum type.
> For one, when reading from database, UtcDateTime returns native datetime type.
> Also dag.previous_schedule returns datetime type.
> So, in general, `prev_execution_date` and `ti.previous_ti.execution_date` may 
> be non-pendulum. 
> (there are edge cases when the context var prev_* is pendulum e.g. when there 
> is no DR or no schedule interval or manually triggered, but in general, no.)
> The problem is, this leads to errors and confusion when using these fields in 
> templating, when you expect it to be pendulum but it isn't.
> There are a few things to consider:
>  # make UtcDateTime sqlalchemy type return pendulum
>  # make execution date a property of TaskInstance with appropriate getter 
> returning pendulum.
>  # Change dag.previous_schedule to return pendulum
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (AIRFLOW-4788) prev_execution_date is not always pendulum.datetime class

2019-06-12 Thread Daniel Standish (JIRA)
Daniel Standish created AIRFLOW-4788:


 Summary: prev_execution_date is not always pendulum.datetime class
 Key: AIRFLOW-4788
 URL: https://issues.apache.org/jira/browse/AIRFLOW-4788
 Project: Apache Airflow
  Issue Type: Bug
  Components: core
Affects Versions: 1.10.3
Reporter: Daniel Standish


In certain circumstances previous execution dates may not be pendulum type.

For one, when reading from database, UtcDateTime returns native datetime type.

Also dag.previous_schedule returns datetime type.

So, depending on circumstances, `prev_execution_date` and 
`ti.previous_ti.execution_date` may be non-pendulum. 

The problem is, this leads to errors and confusion when using these fields in 
templating, when you expect it to be pendulum but it isn't.

There are a few things to consider:
 # make UtcDateTime sqlalchemy type return pendulum
 # make execution date a property of TaskInstance with appropriate getter 
returning pendulum.
 # Change dag.previous_schedule to return pendulum

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (AIRFLOW-4787) clicking task status in dag view should take you to TIs of last run only

2019-06-12 Thread Daniel Standish (JIRA)
Daniel Standish created AIRFLOW-4787:


 Summary: clicking task status in dag view should take you to TIs 
of last run only
 Key: AIRFLOW-4787
 URL: https://issues.apache.org/jira/browse/AIRFLOW-4787
 Project: Apache Airflow
  Issue Type: Improvement
  Components: ui
Affects Versions: 1.10.3
Reporter: Daniel Standish


In dags view, hen you click on the tasks from last run (e.g. the red circle 
indicating failed tasks), it currently takes you to TIs but includes _all_ 
failed tasks for that dag.  

If your dag has a lot of tasks this makes it harder to clear them.  You have to 
select them all individually and be careful not to select tasks in prior run.

Following this change, you would initially only see the tasks from last run 
(consistent with the number represented in dag view that you clicked on).  And 
if you want to see all tasks you can just drop the execution date filter.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (AIRFLOW-4756) gantt chart view fails after clearing failed task in current run

2019-06-11 Thread Daniel Standish (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish resolved AIRFLOW-4756.
--
   Resolution: Fixed
Fix Version/s: 1.10.4

merged into master [https://github.com/apache/airflow/pull/5399]

> gantt chart view fails after clearing failed task in current run
> 
>
> Key: AIRFLOW-4756
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4756
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: ui
>Affects Versions: 1.10.3
>Reporter: Daniel Standish
>Assignee: Daniel Standish
>Priority: Minor
> Fix For: 1.10.4
>
>
> To repro: 
> Make some dag with a number of decently long-running tasks (so you have time 
> to do this).
> Get the dag running
> Make sure one of the tasks fails.
> Before the others complete, clear the failing task.
> View gantt chart.
> Observer error like so:
> {code}
> Traceback (most recent call last):
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py",
>  line 2292, in wsgi_app
> response = self.full_dispatch_request()
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py",
>  line 1815, in full_dispatch_request
> rv = self.handle_user_exception(e)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py",
>  line 1718, in handle_user_exception
> reraise(exc_type, exc_value, tb)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/_compat.py",
>  line 35, in reraise
> raise value
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py",
>  line 1813, in full_dispatch_request
> rv = self.dispatch_request()
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py",
>  line 1799, in dispatch_request
> return self.view_functions[rule.endpoint](**req.view_args)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask_admin/base.py",
>  line 69, in inner
> return self._run_view(f, *args, **kwargs)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask_admin/base.py",
>  line 368, in _run_view
> return fn(self, *args, **kwargs)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask_login/utils.py",
>  line 258, in decorated_view
> return func(*args, **kwargs)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/utils.py",
>  line 275, in wrapper
> return f(*args, **kwargs)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/utils/db.py",
>  line 73, in wrapper
> return func(*args, **kwargs)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/views.py",
>  line 2015, in gantt
> root=root,
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask_admin/base.py",
>  line 308, in render
> return render_template(template, **kwargs)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/templating.py",
>  line 135, in render_template
> context, ctx.app)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/templating.py",
>  line 117, in _render
> rv = template.render(context)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/jinja2/asyncsupport.py",
>  line 76, in render
> return original_render(self, *args, **kwargs)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/jinja2/environment.py",
>  line 1008, in render
> return self.environment.handle_exception(exc_info, True)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/jinja2/environment.py",
>  line 780, in handle_exception
> reraise(exc_type, exc_value, tb)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/jinja2/_compat.py",
>  line 37, in reraise
> raise value.with_traceback(tb)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/templates/airflow/gantt.html",
>  line 18, in top-level template code
> {% extends "airflow/dag.html" %}
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/templates/airflow/dag.html",
>  line 19, in top-level template code
> {% import 'admin/lib.html' as lib with context %}
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/templates/airflow/master.html",
>  line 18, in top-level template code
> {% extends "admin/master.html" %}
>   File 
> 

[jira] [Assigned] (AIRFLOW-4756) gantt chart view fails after clearing failed task in current run

2019-06-11 Thread Daniel Standish (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish reassigned AIRFLOW-4756:


Assignee: Daniel Standish

> gantt chart view fails after clearing failed task in current run
> 
>
> Key: AIRFLOW-4756
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4756
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: ui
>Affects Versions: 1.10.3
>Reporter: Daniel Standish
>Assignee: Daniel Standish
>Priority: Minor
>
> To repro: 
> Make some dag with a number of decently long-running tasks (so you have time 
> to do this).
> Get the dag running
> Make sure one of the tasks fails.
> Before the others complete, clear the failing task.
> View gantt chart.
> Observer error like so:
> {code}
> Traceback (most recent call last):
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py",
>  line 2292, in wsgi_app
> response = self.full_dispatch_request()
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py",
>  line 1815, in full_dispatch_request
> rv = self.handle_user_exception(e)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py",
>  line 1718, in handle_user_exception
> reraise(exc_type, exc_value, tb)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/_compat.py",
>  line 35, in reraise
> raise value
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py",
>  line 1813, in full_dispatch_request
> rv = self.dispatch_request()
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py",
>  line 1799, in dispatch_request
> return self.view_functions[rule.endpoint](**req.view_args)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask_admin/base.py",
>  line 69, in inner
> return self._run_view(f, *args, **kwargs)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask_admin/base.py",
>  line 368, in _run_view
> return fn(self, *args, **kwargs)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask_login/utils.py",
>  line 258, in decorated_view
> return func(*args, **kwargs)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/utils.py",
>  line 275, in wrapper
> return f(*args, **kwargs)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/utils/db.py",
>  line 73, in wrapper
> return func(*args, **kwargs)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/views.py",
>  line 2015, in gantt
> root=root,
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask_admin/base.py",
>  line 308, in render
> return render_template(template, **kwargs)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/templating.py",
>  line 135, in render_template
> context, ctx.app)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/templating.py",
>  line 117, in _render
> rv = template.render(context)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/jinja2/asyncsupport.py",
>  line 76, in render
> return original_render(self, *args, **kwargs)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/jinja2/environment.py",
>  line 1008, in render
> return self.environment.handle_exception(exc_info, True)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/jinja2/environment.py",
>  line 780, in handle_exception
> reraise(exc_type, exc_value, tb)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/jinja2/_compat.py",
>  line 37, in reraise
> raise value.with_traceback(tb)
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/templates/airflow/gantt.html",
>  line 18, in top-level template code
> {% extends "airflow/dag.html" %}
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/templates/airflow/dag.html",
>  line 19, in top-level template code
> {% import 'admin/lib.html' as lib with context %}
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/templates/airflow/master.html",
>  line 18, in top-level template code
> {% extends "admin/master.html" %}
>   File 
> "/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/templates/admin/master.html",
>  line 18, in top-level template 

[jira] [Created] (AIRFLOW-4756) gantt chart view fails after clearing failed task in current run

2019-06-10 Thread Daniel Standish (JIRA)
Daniel Standish created AIRFLOW-4756:


 Summary: gantt chart view fails after clearing failed task in 
current run
 Key: AIRFLOW-4756
 URL: https://issues.apache.org/jira/browse/AIRFLOW-4756
 Project: Apache Airflow
  Issue Type: Bug
  Components: ui
Affects Versions: 1.10.3
Reporter: Daniel Standish


To repro: 
Make some dag with a number of decently long-running tasks (so you have time to 
do this).

Get the dag running

Make sure one of the tasks fails.

Before the others complete, clear the failing task.

View gantt chart.

Observer error like so:

{code}
Traceback (most recent call last):
  File 
"/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py",
 line 2292, in wsgi_app
response = self.full_dispatch_request()
  File 
"/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py",
 line 1815, in full_dispatch_request
rv = self.handle_user_exception(e)
  File 
"/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py",
 line 1718, in handle_user_exception
reraise(exc_type, exc_value, tb)
  File 
"/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/_compat.py",
 line 35, in reraise
raise value
  File 
"/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py",
 line 1813, in full_dispatch_request
rv = self.dispatch_request()
  File 
"/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/app.py",
 line 1799, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
  File 
"/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask_admin/base.py",
 line 69, in inner
return self._run_view(f, *args, **kwargs)
  File 
"/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask_admin/base.py",
 line 368, in _run_view
return fn(self, *args, **kwargs)
  File 
"/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask_login/utils.py",
 line 258, in decorated_view
return func(*args, **kwargs)
  File 
"/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/utils.py",
 line 275, in wrapper
return f(*args, **kwargs)
  File 
"/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/utils/db.py",
 line 73, in wrapper
return func(*args, **kwargs)
  File 
"/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/views.py",
 line 2015, in gantt
root=root,
  File 
"/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask_admin/base.py",
 line 308, in render
return render_template(template, **kwargs)
  File 
"/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/templating.py",
 line 135, in render_template
context, ctx.app)
  File 
"/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask/templating.py",
 line 117, in _render
rv = template.render(context)
  File 
"/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/jinja2/asyncsupport.py",
 line 76, in render
return original_render(self, *args, **kwargs)
  File 
"/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/jinja2/environment.py",
 line 1008, in render
return self.environment.handle_exception(exc_info, True)
  File 
"/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/jinja2/environment.py",
 line 780, in handle_exception
reraise(exc_type, exc_value, tb)
  File 
"/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/jinja2/_compat.py",
 line 37, in reraise
raise value.with_traceback(tb)
  File 
"/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/templates/airflow/gantt.html",
 line 18, in top-level template code
{% extends "airflow/dag.html" %}
  File 
"/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/templates/airflow/dag.html",
 line 19, in top-level template code
{% import 'admin/lib.html' as lib with context %}
  File 
"/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/templates/airflow/master.html",
 line 18, in top-level template code
{% extends "admin/master.html" %}
  File 
"/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/templates/admin/master.html",
 line 18, in top-level template code
{% extends 'admin/base.html' %}
  File 
"/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/flask_admin/templates/bootstrap3/admin/base.html",
 line 94, in top-level template code
{% block tail %}
  File 
"/Users/someperson/.virtualenvs/scratch/lib/python3.7/site-packages/airflow/www/templates/airflow/gantt.html",
 line 58, in block "tail"
data = {{ data |tojson|safe }};
  File 

[jira] [Commented] (AIRFLOW-4298) Stop Scheduler warning repeatedly about "connection invalidated"

2019-05-30 Thread Daniel Standish (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-4298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16851561#comment-16851561
 ] 

Daniel Standish commented on AIRFLOW-4298:
--

[~ash] [~gcuriel] yes this is from airflow not sqlalchemy.

one solution could be to use {{log.debug}} on first reconnect, and only use 
{{log.warning}} if {{backoff > initial_backoff_seconds}} -- i.e. if it has to 
retry.  it seems like it always reconnects on first try.

of course it would be nice to know what is causing this and resolve that 
instead but i am not sure how to figure that out.  and it does not seem to 
present a material problem.

> Stop Scheduler warning repeatedly about "connection invalidated"
> 
>
> Key: AIRFLOW-4298
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4298
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: scheduler
>Affects Versions: 1.10.3
> Environment: Main host with airflow services: Cent-OS 7, Python 3.6.1.
> DB - official Docker image with Postgresql 11.2-alpine
>Reporter: Anton Cherkasov
>Priority: Minor
>
> I have some strange issue with scheduler after upgrade to 1.10.3 from 1.10.2. 
> DAG tasks runs only once. After that scheduler logs looks like this:
> {noformat}
> Apr 11 09:21:33 airflow.infra airflow[32739]: [2019-04-11 09:21:33,094] 
> {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting...
> Apr 11 09:21:44 airflow.infra airflow[32739]: [2019-04-11 09:21:44,105] 
> {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting...
> Apr 11 09:21:55 airflow.infra airflow[32739]: [2019-04-11 09:21:55,114] 
> {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting...
> Apr 11 09:22:06 airflow.infra airflow[32739]: [2019-04-11 09:22:06,123] 
> {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting...
> Apr 11 09:22:17 airflow.infra airflow[32739]: [2019-04-11 09:22:17,131] 
> {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting...
> Apr 11 09:22:28 airflow.infra airflow[32739]: [2019-04-11 09:22:28,143] 
> {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting...
> {noformat}
> Logs from Scheduler with *DEBUG* level:
> {noformat}
> Apr 11 09:00:47 airflow.infra airflow[17403]: [2019-04-11 09:00:47,720] 
> {{settings.py:154}} DEBUG - Setting up DB connection pool (PID 17447)
> Apr 11 09:00:47 airflow.infra airflow[17403]: [2019-04-11 09:00:47,720] 
> {{settings.py:182}} INFO - settings.configure_orm(): Using pool settings. 
> pool_size=100, pool_recycle=3600, pid=17447
> Apr 11 09:00:48 airflow.infra airflow[17403]: [2019-04-11 09:00:48,450] 
> {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 17449)
> Apr 11 09:00:48 airflow.infra airflow[17403]: [2019-04-11 09:00:48,535] 
> {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 17448)
> Apr 11 09:00:48 airflow.infra airflow[17403]: [2019-04-11 09:00:48,706] 
> {{jobs.py:1663}} DEBUG - Sleeping for 1.00 seconds to prevent excessive 
> logging
> . . .
> Apr 11 09:01:29 airflow.infra airflow[17403]: [2019-04-11 09:01:29,884] 
> {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21866)
> Apr 11 09:01:30 airflow.infra airflow[17403]: [2019-04-11 09:01:30,492] 
> {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21865)
> Apr 11 09:01:30 airflow.infra airflow[17403]: [2019-04-11 09:01:30,785] 
> {{jobs.py:496}} DEBUG - Waiting for  stopped)>
> Apr 11 09:01:30 airflow.infra airflow[17403]: [2019-04-11 09:01:30,786] 
> {{jobs.py:496}} DEBUG - Waiting for  stopped)>
> Apr 11 09:01:30 airflow.infra airflow[17403]: [2019-04-11 09:01:30,790] 
> {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting...
> Apr 11 09:01:31 airflow.infra airflow[17403]: [2019-04-11 09:01:31,765] 
> {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21910)
> Apr 11 09:01:31 airflow.infra airflow[17403]: [2019-04-11 09:01:31,786] 
> {{jobs.py:496}} DEBUG - Waiting for  stopped)>
> Apr 11 09:01:31 airflow.infra airflow[17403]: [2019-04-11 09:01:31,866] 
> {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21909)
> Apr 11 09:01:32 airflow.infra airflow[17403]: [2019-04-11 09:01:32,468] 
> {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21921)
> Apr 11 09:01:32 airflow.infra airflow[17403]: [2019-04-11 09:01:32,787] 
> {{jobs.py:496}} DEBUG - Waiting for  stopped)>
> Apr 11 09:01:32 airflow.infra airflow[17403]: [2019-04-11 09:01:32,787] 
> {{jobs.py:496}} DEBUG - Waiting for  stopped)>
> Apr 11 09:01:33 airflow.infra airflow[17403]: [2019-04-11 09:01:33,358] 
> {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21933)
> Apr 11 09:01:33 airflow.infra airflow[17403]: [2019-04-11 09:01:33,521] 
> {{settings.py:206}} DEBUG - Disposing 

[jira] [Updated] (AIRFLOW-4134) "DB connection invalidated" warning raised every JOB_HEARTBEAT_SEC

2019-03-21 Thread Daniel Standish (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish updated AIRFLOW-4134:
-
Summary: "DB connection invalidated" warning raised every JOB_HEARTBEAT_SEC 
 (was: "DB connection invalidated" every JOB_HEARTBEAT_SEC)

> "DB connection invalidated" warning raised every JOB_HEARTBEAT_SEC
> --
>
> Key: AIRFLOW-4134
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4134
> Project: Apache Airflow
>  Issue Type: Bug
>Affects Versions: 1.10.2
>Reporter: Daniel Standish
>Priority: Major
>
> I am finding with 1.10.2 that I seem to get a warning {{DB connection 
> invalidated. Reconnecting...}} very frequently.  It seems to coincide closely 
> with my job_heartbeat_sec parameter (5 seconds).
> I have tried to diagnose I added logging of the triggering error on line 79 
> in airflow/utils/sqlalchemy.py, from which this warning is generated.
> Looks like it is related to zombie check.
> Call stack:
> {code}
> Call stack:
>   File "/usr/local/bin/airflow", line 32, in 
> args.func(args)
>   File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 
> 74, in wrapper
> return f(*args, **kwargs)
>   File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 992, 
> in scheduler
> job.run()
>   File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 205, in 
> run
> self._execute()
>   File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1532, 
> in _execute
> self._execute_helper()
>   File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1562, 
> in _execute_helper
> self.processor_agent.start()
>   File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", 
> line 511, in start
> self._async_mode)
>   File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", 
> line 565, in _launch_process
> p.start()
>   File "/usr/local/lib/python3.6/multiprocessing/process.py", line 105, in 
> start
> self._popen = self._Popen(self)
>   File "/usr/local/lib/python3.6/multiprocessing/context.py", line 223, in 
> _Popen
> return _default_context.get_context().Process._Popen(process_obj)
>   File "/usr/local/lib/python3.6/multiprocessing/context.py", line 277, in 
> _Popen
> return Popen(process_obj)
>   File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 19, in 
> __init__
> self._launch(process_obj)
>   File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 73, in 
> _launch
> code = process_obj._bootstrap()
>   File "/usr/local/lib/python3.6/multiprocessing/process.py", line 258, in 
> _bootstrap
> self.run()
>   File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run
> self._target(*self._args, **self._kwargs)
>   File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", 
> line 560, in helper
> processor_manager.start()
>   File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", 
> line 797, in start
> self.start_in_async()
>   File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", 
> line 820, in start_in_async
> simple_dags = self.heartbeat()
>   File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", 
> line 1190, in heartbeat
> zombies = self._find_zombies()
>   File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 73, 
> in wrapper
> return func(*args, **kwargs)
>   File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", 
> line 1236, in _find_zombies
> LJ.latest_heartbeat < limit_dttm,
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
> 2925, in all
> return list(self)
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
> 3081, in __iter__
> return self._execute_and_instances(context)
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
> 3103, in _execute_and_instances
> querycontext, self._connection_from_session, close_with_result=True
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
> 3111, in _get_bind_args
> mapper=self._bind_mapper(), clause=querycontext.statement, **kw
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
> 3096, in _connection_from_session
> conn = self.session.connection(**kw)
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", 
> line 1120, in connection
> execution_options=execution_options,
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", 
> line 1126, in _connection_for_bind
> engine, execution_options
>   File 

[jira] [Updated] (AIRFLOW-4134) "DB connection invalidated" polluting scheduler log

2019-03-21 Thread Daniel Standish (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish updated AIRFLOW-4134:
-
Description: 
I am finding with 1.10.2 that I seem to get a warning {{DB connection 
invalidated. Reconnecting...}} very frequently.  It seems to coincide closely 
with my job_heartbeat_sec parameter (5 seconds).

I have tried to diagnose I added logging of the triggering error on line 79 in 
airflow/utils/sqlalchemy.py, from which this warning is generated.

Looks like it is related to zombie check.

Call stack:
{code}
Call stack:
  File "/usr/local/bin/airflow", line 32, in 
args.func(args)
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 74, 
in wrapper
return f(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 992, 
in scheduler
job.run()
  File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 205, in 
run
self._execute()
  File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1532, in 
_execute
self._execute_helper()
  File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1562, in 
_execute_helper
self.processor_agent.start()
  File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
511, in start
self._async_mode)
  File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
565, in _launch_process
p.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 105, in start
self._popen = self._Popen(self)
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 223, in 
_Popen
return _default_context.get_context().Process._Popen(process_obj)
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 277, in 
_Popen
return Popen(process_obj)
  File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 19, in 
__init__
self._launch(process_obj)
  File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 73, in 
_launch
code = process_obj._bootstrap()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 258, in 
_bootstrap
self.run()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
  File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
560, in helper
processor_manager.start()
  File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
797, in start
self.start_in_async()
  File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
820, in start_in_async
simple_dags = self.heartbeat()
  File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
1190, in heartbeat
zombies = self._find_zombies()
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 73, 
in wrapper
return func(*args, **kwargs)
  File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
1236, in _find_zombies
LJ.latest_heartbeat < limit_dttm,
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
2925, in all
return list(self)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
3081, in __iter__
return self._execute_and_instances(context)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
3103, in _execute_and_instances
querycontext, self._connection_from_session, close_with_result=True
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
3111, in _get_bind_args
mapper=self._bind_mapper(), clause=querycontext.statement, **kw
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
3096, in _connection_from_session
conn = self.session.connection(**kw)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 
1120, in connection
execution_options=execution_options,
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 
1126, in _connection_for_bind
engine, execution_options
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 
424, in _connection_for_bind
conn = bind.contextual_connect()
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 
2194, in contextual_connect
**kwargs
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 
125, in __init__
self.dispatch.engine_connect(self, self.__branch)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/event/attr.py", line 
297, in __call__
fn(*args, **kw)
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/sqlalchemy.py", 
line 79, in ping_connection
log.warning("DB connection invalidated. Reconnecting...", err)
Message: 'DB connection 

[jira] [Updated] (AIRFLOW-4134) "DB connection invalidated" every JOB_HEARTBEAT_SEC

2019-03-21 Thread Daniel Standish (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish updated AIRFLOW-4134:
-
Summary: "DB connection invalidated" every JOB_HEARTBEAT_SEC  (was: "DB 
connection invalidated" polluting scheduler log)

> "DB connection invalidated" every JOB_HEARTBEAT_SEC
> ---
>
> Key: AIRFLOW-4134
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4134
> Project: Apache Airflow
>  Issue Type: Bug
>Affects Versions: 1.10.2
>Reporter: Daniel Standish
>Priority: Major
>
> I am finding with 1.10.2 that I seem to get a warning {{DB connection 
> invalidated. Reconnecting...}} very frequently.  It seems to coincide closely 
> with my job_heartbeat_sec parameter (5 seconds).
> I have tried to diagnose I added logging of the triggering error on line 79 
> in airflow/utils/sqlalchemy.py, from which this warning is generated.
> Looks like it is related to zombie check.
> Call stack:
> {code}
> Call stack:
>   File "/usr/local/bin/airflow", line 32, in 
> args.func(args)
>   File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 
> 74, in wrapper
> return f(*args, **kwargs)
>   File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 992, 
> in scheduler
> job.run()
>   File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 205, in 
> run
> self._execute()
>   File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1532, 
> in _execute
> self._execute_helper()
>   File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1562, 
> in _execute_helper
> self.processor_agent.start()
>   File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", 
> line 511, in start
> self._async_mode)
>   File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", 
> line 565, in _launch_process
> p.start()
>   File "/usr/local/lib/python3.6/multiprocessing/process.py", line 105, in 
> start
> self._popen = self._Popen(self)
>   File "/usr/local/lib/python3.6/multiprocessing/context.py", line 223, in 
> _Popen
> return _default_context.get_context().Process._Popen(process_obj)
>   File "/usr/local/lib/python3.6/multiprocessing/context.py", line 277, in 
> _Popen
> return Popen(process_obj)
>   File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 19, in 
> __init__
> self._launch(process_obj)
>   File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 73, in 
> _launch
> code = process_obj._bootstrap()
>   File "/usr/local/lib/python3.6/multiprocessing/process.py", line 258, in 
> _bootstrap
> self.run()
>   File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run
> self._target(*self._args, **self._kwargs)
>   File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", 
> line 560, in helper
> processor_manager.start()
>   File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", 
> line 797, in start
> self.start_in_async()
>   File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", 
> line 820, in start_in_async
> simple_dags = self.heartbeat()
>   File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", 
> line 1190, in heartbeat
> zombies = self._find_zombies()
>   File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 73, 
> in wrapper
> return func(*args, **kwargs)
>   File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", 
> line 1236, in _find_zombies
> LJ.latest_heartbeat < limit_dttm,
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
> 2925, in all
> return list(self)
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
> 3081, in __iter__
> return self._execute_and_instances(context)
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
> 3103, in _execute_and_instances
> querycontext, self._connection_from_session, close_with_result=True
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
> 3111, in _get_bind_args
> mapper=self._bind_mapper(), clause=querycontext.statement, **kw
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
> 3096, in _connection_from_session
> conn = self.session.connection(**kw)
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", 
> line 1120, in connection
> execution_options=execution_options,
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", 
> line 1126, in _connection_for_bind
> engine, execution_options
>   File 

[jira] [Updated] (AIRFLOW-4134) "DB connection invalidated" warning at every zombie check

2019-03-20 Thread Daniel Standish (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish updated AIRFLOW-4134:
-
Description: 
I am finding with 1.10.2 that I seem to get a warning {{DB connection 
invalidated. Reconnecting...}} very frequently.

I to try to diagnose I added logging of the triggering error on line 79 in 
airflow/utils/sqlalchemy.py, from which this warning is generated.

Call stack:
{code}
Call stack:
  File "/usr/local/bin/airflow", line 32, in 
args.func(args)
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 74, 
in wrapper
return f(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 992, 
in scheduler
job.run()
  File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 205, in 
run
self._execute()
  File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1532, in 
_execute
self._execute_helper()
  File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1562, in 
_execute_helper
self.processor_agent.start()
  File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
511, in start
self._async_mode)
  File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
565, in _launch_process
p.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 105, in start
self._popen = self._Popen(self)
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 223, in 
_Popen
return _default_context.get_context().Process._Popen(process_obj)
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 277, in 
_Popen
return Popen(process_obj)
  File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 19, in 
__init__
self._launch(process_obj)
  File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 73, in 
_launch
code = process_obj._bootstrap()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 258, in 
_bootstrap
self.run()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
  File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
560, in helper
processor_manager.start()
  File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
797, in start
self.start_in_async()
  File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
820, in start_in_async
simple_dags = self.heartbeat()
  File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
1190, in heartbeat
zombies = self._find_zombies()
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 73, 
in wrapper
return func(*args, **kwargs)
  File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
1236, in _find_zombies
LJ.latest_heartbeat < limit_dttm,
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
2925, in all
return list(self)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
3081, in __iter__
return self._execute_and_instances(context)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
3103, in _execute_and_instances
querycontext, self._connection_from_session, close_with_result=True
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
3111, in _get_bind_args
mapper=self._bind_mapper(), clause=querycontext.statement, **kw
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
3096, in _connection_from_session
conn = self.session.connection(**kw)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 
1120, in connection
execution_options=execution_options,
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 
1126, in _connection_for_bind
engine, execution_options
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 
424, in _connection_for_bind
conn = bind.contextual_connect()
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 
2194, in contextual_connect
**kwargs
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 
125, in __init__
self.dispatch.engine_connect(self, self.__branch)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/event/attr.py", line 
297, in __call__
fn(*args, **kw)
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/sqlalchemy.py", 
line 79, in ping_connection
log.warning("DB connection invalidated. Reconnecting...", err)
Message: 'DB connection invalidated. Reconnecting...'
Arguments: (OperationalError('(psycopg2.OperationalError) server closed the 
connection 

[jira] [Updated] (AIRFLOW-4134) "DB connection invalidated" warning at every zombie check

2019-03-20 Thread Daniel Standish (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish updated AIRFLOW-4134:
-
Description: 
I am finding with 1.10.2 that I seem to get a warning {{DB connection 
invalidated. Reconnecting...}} very frequently.

I to try to diagnose I added logging of the triggering error on line 79 in 
airflow/utils/sqlalchemy.py, from which this warning is generated.

Call stack:
{code}
Call stack:
  File "/usr/local/bin/airflow", line 32, in 
args.func(args)
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 74, 
in wrapper
return f(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 992, 
in scheduler
job.run()
  File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 205, in 
run
self._execute()
  File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1532, in 
_execute
self._execute_helper()
  File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1562, in 
_execute_helper
self.processor_agent.start()
  File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
511, in start
self._async_mode)
  File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
565, in _launch_process
p.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 105, in start
self._popen = self._Popen(self)
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 223, in 
_Popen
return _default_context.get_context().Process._Popen(process_obj)
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 277, in 
_Popen
return Popen(process_obj)
  File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 19, in 
__init__
self._launch(process_obj)
  File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 73, in 
_launch
code = process_obj._bootstrap()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 258, in 
_bootstrap
self.run()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
  File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
560, in helper
processor_manager.start()
  File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
797, in start
self.start_in_async()
  File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
820, in start_in_async
simple_dags = self.heartbeat()
  File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
1190, in heartbeat
zombies = self._find_zombies()
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 73, 
in wrapper
return func(*args, **kwargs)
  File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
1236, in _find_zombies
LJ.latest_heartbeat < limit_dttm,
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
2925, in all
return list(self)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
3081, in __iter__
return self._execute_and_instances(context)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
3103, in _execute_and_instances
querycontext, self._connection_from_session, close_with_result=True
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
3111, in _get_bind_args
mapper=self._bind_mapper(), clause=querycontext.statement, **kw
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
3096, in _connection_from_session
conn = self.session.connection(**kw)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 
1120, in connection
execution_options=execution_options,
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 
1126, in _connection_for_bind
engine, execution_options
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 
424, in _connection_for_bind
conn = bind.contextual_connect()
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 
2194, in contextual_connect
**kwargs
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 
125, in __init__
self.dispatch.engine_connect(self, self.__branch)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/event/attr.py", line 
297, in __call__
fn(*args, **kw)
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/sqlalchemy.py", 
line 79, in ping_connection
log.warning("DB connection invalidated. Reconnecting...", err)
Message: 'DB connection invalidated. Reconnecting...'
Arguments: (OperationalError('(psycopg2.OperationalError) server closed the 
connection 

[jira] [Updated] (AIRFLOW-4134) "DB connection invalidated" warning at every zombie check

2019-03-20 Thread Daniel Standish (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish updated AIRFLOW-4134:
-
Description: 
I am finding with 1.10.2 that I seem to get a warning {{DB connection 
invalidated. Reconnecting...}} very frequently.

I to try to diagnose I added logging of the triggering error on line 79 in 
airflow/utils/sqlalchemy.py, from which this warning is generated.

Call stack:
{code}
webserver_1  | Call stack:
webserver_1  |   File "/usr/local/bin/airflow", line 32, in 
webserver_1  | args.func(args)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 74, in 
wrapper
webserver_1  | return f(*args, **kwargs)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 992, in 
scheduler
webserver_1  | job.run()
webserver_1  |   File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", 
line 205, in run
webserver_1  | self._execute()
webserver_1  |   File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", 
line 1532, in _execute
webserver_1  | self._execute_helper()
webserver_1  |   File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", 
line 1562, in _execute_helper
webserver_1  | self.processor_agent.start()
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
511, in start
webserver_1  | self._async_mode)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
565, in _launch_process
webserver_1  | p.start()
webserver_1  |   File "/usr/local/lib/python3.6/multiprocessing/process.py", 
line 105, in start
webserver_1  | self._popen = self._Popen(self)
webserver_1  |   File "/usr/local/lib/python3.6/multiprocessing/context.py", 
line 223, in _Popen
webserver_1  | return 
_default_context.get_context().Process._Popen(process_obj)
webserver_1  |   File "/usr/local/lib/python3.6/multiprocessing/context.py", 
line 277, in _Popen
webserver_1  | return Popen(process_obj)
webserver_1  |   File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", 
line 19, in __init__
webserver_1  | self._launch(process_obj)
webserver_1  |   File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", 
line 73, in _launch
webserver_1  | code = process_obj._bootstrap()
webserver_1  |   File "/usr/local/lib/python3.6/multiprocessing/process.py", 
line 258, in _bootstrap
webserver_1  | self.run()
webserver_1  |   File "/usr/local/lib/python3.6/multiprocessing/process.py", 
line 93, in run
webserver_1  | self._target(*self._args, **self._kwargs)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
560, in helper
webserver_1  | processor_manager.start()
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
797, in start
webserver_1  | self.start_in_async()
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
820, in start_in_async
webserver_1  | simple_dags = self.heartbeat()
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
1190, in heartbeat
webserver_1  | zombies = self._find_zombies()
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 73, in 
wrapper
webserver_1  | return func(*args, **kwargs)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 
1236, in _find_zombies
webserver_1  | LJ.latest_heartbeat < limit_dttm,
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 2925, in 
all
webserver_1  | return list(self)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3081, in 
__iter__
webserver_1  | return self._execute_and_instances(context)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3103, in 
_execute_and_instances
webserver_1  | querycontext, self._connection_from_session, 
close_with_result=True
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3111, in 
_get_bind_args
webserver_1  | mapper=self._bind_mapper(), clause=querycontext.statement, 
**kw
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3096, in 
_connection_from_session
webserver_1  | conn = self.session.connection(**kw)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 1120, 
in connection
webserver_1  | execution_options=execution_options,
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 1126, 
in _connection_for_bind
webserver_1  | engine, 

[jira] [Updated] (AIRFLOW-4134) "DB connection invalidated" warning at every zombie check

2019-03-20 Thread Daniel Standish (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish updated AIRFLOW-4134:
-
Description: 
I am finding with 1.10.2 that I seem to get a warning {{DB connection 
invalidated. Reconnecting...}} very frequently.

I to try to diagnose I added logging of the triggering error on line 79 in 
airflow/utils/sqlalchemy.py, from which this warning is generated.

Here's the traceback:

{code}
webserver_1  | Traceback (most recent call last):
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/sqlalchemy.py", line 68, 
in ping_connection
webserver_1  | connection.scalar(select([1]))
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 912, 
in scalar
webserver_1  | return self.execute(object_, *multiparams, **params).scalar()
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 980, 
in execute
webserver_1  | return meth(self, multiparams, params)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 273, 
in _execute_on_connection
webserver_1  | return connection._execute_clauseelement(self, multiparams, 
params)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1099, 
in _execute_clauseelement
webserver_1  | distilled_params,
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1240, 
in _execute_context
webserver_1  | e, statement, parameters, cursor, context
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1458, 
in _handle_dbapi_exception
webserver_1  | util.raise_from_cause(sqlalchemy_exception, exc_info)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 296, 
in raise_from_cause
webserver_1  | reraise(type(exception), exception, tb=exc_tb, cause=cause)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 276, 
in reraise
webserver_1  | raise value.with_traceback(tb)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1236, 
in _execute_context
webserver_1  | cursor, statement, parameters, context
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 
536, in do_execute
webserver_1  | cursor.execute(statement, parameters)
webserver_1  | sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) 
server closed the connection unexpectedly
webserver_1  |  This probably means the server terminated abnormally
webserver_1  |  before or while processing the request.
webserver_1  |  [SQL: 'SELECT 1'] (Background on this error at: 
http://sqlalche.me/e/e3q8)
{code}

It has something to do with the configure_orm function in airflow/settings.py, 
because that is the only usage of setup_event_handlers (from 
airflow/utils/sqlalchemy.py).

And if I disable connection pooling, then the warning seems to go away.

Beyond that, I am not sure where to go from here.  But something must be wrong. 
 




  was:
I am finding with 1.10.2 that I seem to get a warning {{DB connection 
invalidated. Reconnecting...}} very frequently.

I to try to diagnose I added logging of the triggering error on line 79 in 
airflow/utils/sqlalchemy.py, from which this warning is generated.

Here's the traceback:

{code}
webserver_1  | Traceback (most recent call last):
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/sqlalchemy.py", line 68, 
in ping_connection
webserver_1  | connection.scalar(select([1]))
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 912, 
in scalar
webserver_1  | return self.execute(object_, *multiparams, **params).scalar()
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 980, 
in execute
webserver_1  | return meth(self, multiparams, params)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 273, 
in _execute_on_connection
webserver_1  | return connection._execute_clauseelement(self, multiparams, 
params)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1099, 
in _execute_clauseelement
webserver_1  | distilled_params,
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1240, 
in _execute_context
webserver_1  | e, statement, parameters, cursor, context
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1458, 
in _handle_dbapi_exception
webserver_1  | util.raise_from_cause(sqlalchemy_exception, exc_info)
webserver_1  |   File 

[jira] [Updated] (AIRFLOW-4134) DB connection invalidated warning at every zombie check

2019-03-20 Thread Daniel Standish (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish updated AIRFLOW-4134:
-
Description: 
I am finding with 1.10.2 that I seem to get a warning {{DB connection 
invalidated. Reconnecting...}} very frequently.

I to try to diagnose I added logging of the triggering error on line 79 in 
airflow/utils/sqlalchemy.py, from which this warning is generated.

Here's the traceback:

{code}
webserver_1  | Traceback (most recent call last):
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/sqlalchemy.py", line 68, 
in ping_connection
webserver_1  | connection.scalar(select([1]))
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 912, 
in scalar
webserver_1  | return self.execute(object_, *multiparams, **params).scalar()
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 980, 
in execute
webserver_1  | return meth(self, multiparams, params)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 273, 
in _execute_on_connection
webserver_1  | return connection._execute_clauseelement(self, multiparams, 
params)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1099, 
in _execute_clauseelement
webserver_1  | distilled_params,
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1240, 
in _execute_context
webserver_1  | e, statement, parameters, cursor, context
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1458, 
in _handle_dbapi_exception
webserver_1  | util.raise_from_cause(sqlalchemy_exception, exc_info)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 296, 
in raise_from_cause
webserver_1  | reraise(type(exception), exception, tb=exc_tb, cause=cause)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 276, 
in reraise
webserver_1  | raise value.with_traceback(tb)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1236, 
in _execute_context
webserver_1  | cursor, statement, parameters, context
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 
536, in do_execute
webserver_1  | cursor.execute(statement, parameters)
webserver_1  | sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) 
server closed the connection unexpectedly
webserver_1  |  This probably means the server terminated abnormally
webserver_1  |  before or while processing the request.
webserver_1  |  [SQL: 'SELECT 1'] (Background on this error at: 
http://sqlalche.me/e/e3q8)
{code}

It has something to do with the configure_orm function in airflow/settings.py, 
because that is the only usage of setup_event_handlers (from 
airflow/utils/sqlalchemy.py).

I am not sure where to go from here.  But something must be wrong.  




  was:
I am finding with 1.10.2 that I seem to get a warning {{DB connection 
invalidated. Reconnecting...}} very frequently.

I to try to diagnose I added logging of the triggering error on line 79 in 
airflow/utils/sqlalchemy.py, from which this warning is generated.

Here's the traceback:

{code}
webserver_1  | Traceback (most recent call last):
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/sqlalchemy.py", line 68, 
in ping_connection
webserver_1  | connection.scalar(select([1]))
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 912, 
in scalar
webserver_1  | return self.execute(object_, *multiparams, **params).scalar()
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 980, 
in execute
webserver_1  | return meth(self, multiparams, params)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 273, 
in _execute_on_connection
webserver_1  | return connection._execute_clauseelement(self, multiparams, 
params)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1099, 
in _execute_clauseelement
webserver_1  | distilled_params,
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1240, 
in _execute_context
webserver_1  | e, statement, parameters, cursor, context
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1458, 
in _handle_dbapi_exception
webserver_1  | util.raise_from_cause(sqlalchemy_exception, exc_info)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 296, 
in raise_from_cause
webserver_1  | 

[jira] [Updated] (AIRFLOW-4134) "DB connection invalidated" warning at every zombie check

2019-03-20 Thread Daniel Standish (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish updated AIRFLOW-4134:
-
Summary: "DB connection invalidated" warning at every zombie check  (was: 
DB connection invalidated warning at every zombie check)

> "DB connection invalidated" warning at every zombie check
> -
>
> Key: AIRFLOW-4134
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4134
> Project: Apache Airflow
>  Issue Type: Bug
>Affects Versions: 1.10.2
>Reporter: Daniel Standish
>Priority: Major
>
> I am finding with 1.10.2 that I seem to get a warning {{DB connection 
> invalidated. Reconnecting...}} very frequently.
> I to try to diagnose I added logging of the triggering error on line 79 in 
> airflow/utils/sqlalchemy.py, from which this warning is generated.
> Here's the traceback:
> {code}
> webserver_1  | Traceback (most recent call last):
> webserver_1  |   File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/sqlalchemy.py", line 
> 68, in ping_connection
> webserver_1  | connection.scalar(select([1]))
> webserver_1  |   File 
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 912, 
> in scalar
> webserver_1  | return self.execute(object_, *multiparams, 
> **params).scalar()
> webserver_1  |   File 
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 980, 
> in execute
> webserver_1  | return meth(self, multiparams, params)
> webserver_1  |   File 
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 
> 273, in _execute_on_connection
> webserver_1  | return connection._execute_clauseelement(self, 
> multiparams, params)
> webserver_1  |   File 
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 
> 1099, in _execute_clauseelement
> webserver_1  | distilled_params,
> webserver_1  |   File 
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 
> 1240, in _execute_context
> webserver_1  | e, statement, parameters, cursor, context
> webserver_1  |   File 
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 
> 1458, in _handle_dbapi_exception
> webserver_1  | util.raise_from_cause(sqlalchemy_exception, exc_info)
> webserver_1  |   File 
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 296, 
> in raise_from_cause
> webserver_1  | reraise(type(exception), exception, tb=exc_tb, cause=cause)
> webserver_1  |   File 
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 276, 
> in reraise
> webserver_1  | raise value.with_traceback(tb)
> webserver_1  |   File 
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 
> 1236, in _execute_context
> webserver_1  | cursor, statement, parameters, context
> webserver_1  |   File 
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 
> 536, in do_execute
> webserver_1  | cursor.execute(statement, parameters)
> webserver_1  | sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) 
> server closed the connection unexpectedly
> webserver_1  |This probably means the server terminated abnormally
> webserver_1  |before or while processing the request.
> webserver_1  |  [SQL: 'SELECT 1'] (Background on this error at: 
> http://sqlalche.me/e/e3q8)
> {code}
> It has something to do with the configure_orm function in 
> airflow/settings.py, because that is the only usage of setup_event_handlers 
> (from airflow/utils/sqlalchemy.py).
> I am not sure where to go from here.  But something must be wrong.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (AIRFLOW-4134) DB connection invalidated warning at every zombie check

2019-03-20 Thread Daniel Standish (JIRA)
Daniel Standish created AIRFLOW-4134:


 Summary: DB connection invalidated warning at every zombie check
 Key: AIRFLOW-4134
 URL: https://issues.apache.org/jira/browse/AIRFLOW-4134
 Project: Apache Airflow
  Issue Type: Bug
Affects Versions: 1.10.2
Reporter: Daniel Standish


I am finding with 1.10.2 that I seem to get a warning {{DB connection 
invalidated. Reconnecting...}} very frequently.

I to try to diagnose I added logging of the triggering error on line 79 in 
airflow/utils/sqlalchemy.py, from which this warning is generated.

Here's the traceback:

{code}
webserver_1  | Traceback (most recent call last):
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/airflow/utils/sqlalchemy.py", line 68, 
in ping_connection
webserver_1  | connection.scalar(select([1]))
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 912, 
in scalar
webserver_1  | return self.execute(object_, *multiparams, **params).scalar()
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 980, 
in execute
webserver_1  | return meth(self, multiparams, params)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 273, 
in _execute_on_connection
webserver_1  | return connection._execute_clauseelement(self, multiparams, 
params)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1099, 
in _execute_clauseelement
webserver_1  | distilled_params,
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1240, 
in _execute_context
webserver_1  | e, statement, parameters, cursor, context
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1458, 
in _handle_dbapi_exception
webserver_1  | util.raise_from_cause(sqlalchemy_exception, exc_info)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 296, 
in raise_from_cause
webserver_1  | reraise(type(exception), exception, tb=exc_tb, cause=cause)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 276, 
in reraise
webserver_1  | raise value.with_traceback(tb)
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1236, 
in _execute_context
webserver_1  | cursor, statement, parameters, context
webserver_1  |   File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 
536, in do_execute
webserver_1  | cursor.execute(statement, parameters)
webserver_1  | sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) 
server closed the connection unexpectedly
webserver_1  |  This probably means the server terminated abnormally
webserver_1  |  before or while processing the request.
webserver_1  |  [SQL: 'SELECT 1'] (Background on this error at: 
http://sqlalche.me/e/e3q8)
{code}

I am not sure where to go from here.  But something must be wrong.  





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (AIRFLOW-4056) Dag file processing does not respect dag_dir_list_interval

2019-03-20 Thread Daniel Standish (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish closed AIRFLOW-4056.

Resolution: Invalid

> Dag file processing does not respect dag_dir_list_interval
> --
>
> Key: AIRFLOW-4056
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4056
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
>Affects Versions: 1.10.2
> Environment: I have confirmed this issue on mac and centos 
> environments, using mysql backend.
>Reporter: Daniel Standish
>Priority: Major
>
> The conf parameter {{dag_dir_list_interval}} seems to have no effect on dag 
> directory scanning.
> It seems to happen every 2 seconds, no matter what.  The default is supposed 
> to be 5 minutes.
> As  a result I see a scheduler output like this:
> {code:java}
> [2019-03-09 17:06:24,579] {jobs.py:1559} INFO - Harvesting DAG parsing results
> [2019-03-09 17:06:26,587] {jobs.py:1559} INFO - Harvesting DAG parsing results
> [2019-03-09 17:06:28,590] {jobs.py:1559} INFO - Harvesting DAG parsing results
> [2019-03-09 17:06:30,597] {jobs.py:1559} INFO - Harvesting DAG parsing results
> [2019-03-09 17:06:32,603] {jobs.py:1559} INFO - Harvesting DAG parsing results
> [2019-03-09 17:06:34,611] {jobs.py:1559} INFO - Harvesting DAG parsing results
> [2019-03-09 17:06:35,195] {sqlalchemy.py:79} WARNING - DB connection 
> invalidated. Reconnecting...
> [2019-03-09 17:06:36,615] {jobs.py:1559} INFO - Harvesting DAG parsing results
> [2019-03-09 17:06:38,623] {jobs.py:1559} INFO - Harvesting DAG parsing results
> [2019-03-09 17:06:40,631] {jobs.py:1559} INFO - Harvesting DAG parsing results
> [2019-03-09 17:06:42,637] {jobs.py:1559} INFO - Harvesting DAG parsing results
> [2019-03-09 17:06:44,644] {jobs.py:1559} INFO - Harvesting DAG parsing results
> [2019-03-09 17:06:46,205] {sqlalchemy.py:79} WARNING - DB connection 
> invalidated. Reconnecting...
> [2019-03-09 17:06:46,651] {jobs.py:1559} INFO - Harvesting DAG parsing results
> [2019-03-09 17:06:48,658] {jobs.py:1559} INFO - Harvesting DAG parsing results
> [2019-03-09 17:06:50,666] {jobs.py:1559} INFO - Harvesting DAG parsing results
> [2019-03-09 17:06:52,670] {jobs.py:1559} INFO - Harvesting DAG parsing results
> [2019-03-09 17:06:54,680] {jobs.py:1559} INFO - Harvesting DAG parsing results
> [2019-03-09 17:06:56,687] {jobs.py:1559} INFO - Harvesting DAG parsing 
> results{code}
> And no more is there the periodic printing of dag stats, like there was in 
> 1.10.1.
>  I can confirm that this is happening by adding this to something in dag 
> folder:
> {code:python}
> with open(Path('~/temp/test.log').expanduser(), 'at') as f:
> f.write(f"{datetime.now()}: i am imported\n")
> {code}
> Here is some scheduler output with debug log level:
> {code}
>    _
>  |__( )_  __/__  /  __
>   /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
> ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
>  _/_/  |_/_/  /_//_//_/  \//|__/
> [2019-03-09 17:20:59,042] {jobs.py:1477} INFO - Starting the scheduler
> [2019-03-09 17:20:59,042] {jobs.py:1485} INFO - Running execute loop for -1 
> seconds
> [2019-03-09 17:20:59,043] {jobs.py:1486} INFO - Processing each file at most 
> -1 times
> [2019-03-09 17:20:59,043] {jobs.py:1489} INFO - Searching for files in 
> /Users/dstandish/code/python_tfgetl/tfgetl/dags
> [2019-03-09 17:20:59,046] {jobs.py:1491} INFO - There are 11 files in 
> /Users/dstandish/code/python_tfgetl/tfgetl/dags
> [2019-03-09 17:20:59,105] {jobs.py:1534} INFO - Resetting orphaned tasks for 
> active dag runs
> [2019-03-09 17:20:59,121] {dag_processing.py:453} INFO - Launched 
> DagFileProcessorManager with pid: 57333
> [2019-03-09 17:20:59,122] {jobs.py:1548} DEBUG - Starting Loop...
> [2019-03-09 17:20:59,122] {jobs.py:1559} INFO - Harvesting DAG parsing results
> [2019-03-09 17:20:59,123] {jobs.py:1595} DEBUG - Heartbeating the executor
> [2019-03-09 17:20:59,123] {base_executor.py:118} DEBUG - 0 running task 
> instances
> [2019-03-09 17:20:59,123] {base_executor.py:119} DEBUG - 0 in queue
> [2019-03-09 17:20:59,123] {base_executor.py:120} DEBUG - 32 open slots
> [2019-03-09 17:20:59,124] {base_executor.py:149} DEBUG - Calling the  'airflow.executors.local_executor.LocalExecutor'> sync method
> [2019-03-09 17:20:59,128] {jobs.py:1613} DEBUG - Ran scheduling loop in 0.01 
> seconds
> [2019-03-09 17:20:59,129] {jobs.py:1614} DEBUG - Sleeping for 1.00 seconds
> [2019-03-09 17:20:59,130] {settings.py:51} INFO - Configured default timezone 
> 
> [2019-03-09 17:20:59,133] {logging_config.py:63} DEBUG - Unable to load 
> custom logging, using default config instead
> [2019-03-09 17:20:59,143] {settings.py:146} DEBUG - Setting up DB connection 

[jira] [Resolved] (AIRFLOW-3901) Add optional role parameter to snowflake hook

2019-03-12 Thread Daniel Standish (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish resolved AIRFLOW-3901.
--
   Resolution: Fixed
Fix Version/s: (was: 1.10.3)

> Add optional role parameter to snowflake hook
> -
>
> Key: AIRFLOW-3901
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3901
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: contrib
>Reporter: Daniel Standish
>Assignee: Daniel Standish
>Priority: Trivial
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> Role is a parameter missing from snowflake hook.
> I will add it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (AIRFLOW-4056) Dag file processing does not respect dag_dir_list_interval

2019-03-09 Thread Daniel Standish (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish updated AIRFLOW-4056:
-
Description: 
The conf parameter {{dag_dir_list_interval}} seems to have no effect on dag 
directory scanning.

It seems to happen every 2 seconds, no matter what.  The default is supposed to 
be 5 minutes.

As  a result I see a scheduler output like this:
{code:java}
[2019-03-09 17:06:24,579] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:26,587] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:28,590] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:30,597] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:32,603] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:34,611] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:35,195] {sqlalchemy.py:79} WARNING - DB connection 
invalidated. Reconnecting...
[2019-03-09 17:06:36,615] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:38,623] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:40,631] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:42,637] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:44,644] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:46,205] {sqlalchemy.py:79} WARNING - DB connection 
invalidated. Reconnecting...
[2019-03-09 17:06:46,651] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:48,658] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:50,666] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:52,670] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:54,680] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:56,687] {jobs.py:1559} INFO - Harvesting DAG parsing 
results{code}
And no more is there the periodic printing of dag stats, like there was in 
1.10.1.

 I can confirm that this is happening by adding this to something in dag folder:
{code:python}
with open(Path('~/temp/test.log').expanduser(), 'at') as f:
f.write(f"{datetime.now()}: i am imported\n")
{code}

Here is some scheduler output with debug log level:
{code}
   _
 |__( )_  __/__  /  __
  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_//_//_/  \//|__/

[2019-03-09 17:20:59,042] {jobs.py:1477} INFO - Starting the scheduler
[2019-03-09 17:20:59,042] {jobs.py:1485} INFO - Running execute loop for -1 
seconds
[2019-03-09 17:20:59,043] {jobs.py:1486} INFO - Processing each file at most -1 
times
[2019-03-09 17:20:59,043] {jobs.py:1489} INFO - Searching for files in 
/Users/dstandish/code/python_tfgetl/tfgetl/dags
[2019-03-09 17:20:59,046] {jobs.py:1491} INFO - There are 11 files in 
/Users/dstandish/code/python_tfgetl/tfgetl/dags
[2019-03-09 17:20:59,105] {jobs.py:1534} INFO - Resetting orphaned tasks for 
active dag runs
[2019-03-09 17:20:59,121] {dag_processing.py:453} INFO - Launched 
DagFileProcessorManager with pid: 57333
[2019-03-09 17:20:59,122] {jobs.py:1548} DEBUG - Starting Loop...
[2019-03-09 17:20:59,122] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:20:59,123] {jobs.py:1595} DEBUG - Heartbeating the executor
[2019-03-09 17:20:59,123] {base_executor.py:118} DEBUG - 0 running task 
instances
[2019-03-09 17:20:59,123] {base_executor.py:119} DEBUG - 0 in queue
[2019-03-09 17:20:59,123] {base_executor.py:120} DEBUG - 32 open slots
[2019-03-09 17:20:59,124] {base_executor.py:149} DEBUG - Calling the  sync method
[2019-03-09 17:20:59,128] {jobs.py:1613} DEBUG - Ran scheduling loop in 0.01 
seconds
[2019-03-09 17:20:59,129] {jobs.py:1614} DEBUG - Sleeping for 1.00 seconds
[2019-03-09 17:20:59,130] {settings.py:51} INFO - Configured default timezone 

[2019-03-09 17:20:59,133] {logging_config.py:63} DEBUG - Unable to load custom 
logging, using default config instead
[2019-03-09 17:20:59,143] {settings.py:146} DEBUG - Setting up DB connection 
pool (PID 57333)
[2019-03-09 17:20:59,144] {settings.py:174} INFO - settings.configure_orm(): 
Using pool settings. pool_size=5, pool_recycle=1800, pid=57333
[2019-03-09 17:20:59,256] {settings.py:201} DEBUG - Disposing DB connection 
pool (PID 57334)
[2019-03-09 17:20:59,267] {settings.py:201} DEBUG - Disposing DB connection 
pool (PID 57337)
[2019-03-09 17:20:59,610] {settings.py:201} DEBUG - Disposing DB connection 
pool (PID 57336)
[2019-03-09 17:20:59,752] {settings.py:201} DEBUG - Disposing DB connection 
pool (PID 57335)
[2019-03-09 17:21:00,130] {jobs.py:1627} DEBUG - Sleeping for 0.99 seconds to 
prevent excessive logging
[2019-03-09 17:21:00,156] {jobs.py:489} DEBUG - Waiting for 

[2019-03-09 17:21:00,161] 

[jira] [Updated] (AIRFLOW-4056) Dag file processing does not respect dag_dir_list_interval

2019-03-09 Thread Daniel Standish (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish updated AIRFLOW-4056:
-
Description: 
The conf parameter {{dag_dir_list_interval}} seems to have no effect on dag 
directory scanning.

It seems to happen every 2 seconds, no matter what.  The default is supposed to 
be 5 minutes.

As  a result I see a scheduler output like this:
{code:java}
[2019-03-09 17:06:24,579] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:26,587] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:28,590] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:30,597] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:32,603] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:34,611] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:35,195] {sqlalchemy.py:79} WARNING - DB connection 
invalidated. Reconnecting...
[2019-03-09 17:06:36,615] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:38,623] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:40,631] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:42,637] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:44,644] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:46,205] {sqlalchemy.py:79} WARNING - DB connection 
invalidated. Reconnecting...
[2019-03-09 17:06:46,651] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:48,658] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:50,666] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:52,670] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:54,680] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:56,687] {jobs.py:1559} INFO - Harvesting DAG parsing 
results{code}
And no more is there the periodic printing of dag stats, like there was in 
1.10.1.

 I can confirm that this is happening by adding this to something in dag folder:
{code:python}
with open(Path('~/temp/test.log').expanduser(), 'at') as f:
f.write(f"{datetime.now()}: i am imported\n")
{code}

Here is some scheduler output with debug log level:
{code}
   _
 |__( )_  __/__  /  __
  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_//_//_/  \//|__/

[2019-03-09 17:20:59,042] {jobs.py:1477} INFO - Starting the scheduler
[2019-03-09 17:20:59,042] {jobs.py:1485} INFO - Running execute loop for -1 
seconds
[2019-03-09 17:20:59,043] {jobs.py:1486} INFO - Processing each file at most -1 
times
[2019-03-09 17:20:59,043] {jobs.py:1489} INFO - Searching for files in 
/Users/dstandish/code/python_tfgetl/tfgetl/dags
[2019-03-09 17:20:59,046] {jobs.py:1491} INFO - There are 11 files in 
/Users/dstandish/code/python_tfgetl/tfgetl/dags
[2019-03-09 17:20:59,105] {jobs.py:1534} INFO - Resetting orphaned tasks for 
active dag runs
[2019-03-09 17:20:59,121] {dag_processing.py:453} INFO - Launched 
DagFileProcessorManager with pid: 57333
[2019-03-09 17:20:59,122] {jobs.py:1548} DEBUG - Starting Loop...
[2019-03-09 17:20:59,122] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:20:59,123] {jobs.py:1595} DEBUG - Heartbeating the executor
[2019-03-09 17:20:59,123] {base_executor.py:118} DEBUG - 0 running task 
instances
[2019-03-09 17:20:59,123] {base_executor.py:119} DEBUG - 0 in queue
[2019-03-09 17:20:59,123] {base_executor.py:120} DEBUG - 32 open slots
[2019-03-09 17:20:59,124] {base_executor.py:149} DEBUG - Calling the  sync method
[2019-03-09 17:20:59,128] {jobs.py:1613} DEBUG - Ran scheduling loop in 0.01 
seconds
[2019-03-09 17:20:59,129] {jobs.py:1614} DEBUG - Sleeping for 1.00 seconds
[2019-03-09 17:20:59,130] {settings.py:51} INFO - Configured default timezone 

[2019-03-09 17:20:59,133] {logging_config.py:63} DEBUG - Unable to load custom 
logging, using default config instead
[2019-03-09 17:20:59,143] {settings.py:146} DEBUG - Setting up DB connection 
pool (PID 57333)
[2019-03-09 17:20:59,144] {settings.py:174} INFO - settings.configure_orm(): 
Using pool settings. pool_size=5, pool_recycle=1800, pid=57333
[2019-03-09 17:20:59,256] {settings.py:201} DEBUG - Disposing DB connection 
pool (PID 57334)
[2019-03-09 17:20:59,267] {settings.py:201} DEBUG - Disposing DB connection 
pool (PID 57337)
[2019-03-09 17:20:59,610] {settings.py:201} DEBUG - Disposing DB connection 
pool (PID 57336)
[2019-03-09 17:20:59,752] {settings.py:201} DEBUG - Disposing DB connection 
pool (PID 57335)
[2019-03-09 17:21:00,130] {jobs.py:1627} DEBUG - Sleeping for 0.99 seconds to 
prevent excessive logging
[2019-03-09 17:21:00,156] {jobs.py:489} DEBUG - Waiting for 

[2019-03-09 17:21:00,161] 

[jira] [Updated] (AIRFLOW-4056) Dag file processing does not respect dag_dir_list_interval

2019-03-09 Thread Daniel Standish (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish updated AIRFLOW-4056:
-
Description: 
The conf parameter {{dag_dir_list_interval}} seems to have no effect on dag 
directory scanning.

It seems to happen every 2 seconds, no matter what.  The default is supposed to 
be 5 minutes.

As  a result I see a scheduler output like this:
{code:java}
[2019-03-09 17:06:24,579] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:26,587] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:28,590] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:30,597] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:32,603] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:34,611] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:35,195] {sqlalchemy.py:79} WARNING - DB connection 
invalidated. Reconnecting...
[2019-03-09 17:06:36,615] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:38,623] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:40,631] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:42,637] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:44,644] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:46,205] {sqlalchemy.py:79} WARNING - DB connection 
invalidated. Reconnecting...
[2019-03-09 17:06:46,651] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:48,658] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:50,666] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:52,670] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:54,680] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:56,687] {jobs.py:1559} INFO - Harvesting DAG parsing 
results{code}
And no more is there the periodic printing of dag stats, like there was in 
1.10.1.

 I can confirm that this is happening by adding this to something in dag folder:
{code:python}
with open(Path('~/temp/test.log').expanduser(), 'at') as f:
f.write(f"{datetime.now()}: i am imported\n")
{code}

Here is some scheduler output with debug log level:
{code}
   _
 |__( )_  __/__  /  __
  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_//_//_/  \//|__/

[2019-03-09 17:20:59,042] {jobs.py:1477} INFO - Starting the scheduler
[2019-03-09 17:20:59,042] {jobs.py:1485} INFO - Running execute loop for -1 
seconds
[2019-03-09 17:20:59,043] {jobs.py:1486} INFO - Processing each file at most -1 
times
[2019-03-09 17:20:59,043] {jobs.py:1489} INFO - Searching for files in 
/Users/dstandish/code/python_tfgetl/tfgetl/dags
[2019-03-09 17:20:59,046] {jobs.py:1491} INFO - There are 11 files in 
/Users/dstandish/code/python_tfgetl/tfgetl/dags
[2019-03-09 17:20:59,105] {jobs.py:1534} INFO - Resetting orphaned tasks for 
active dag runs
[2019-03-09 17:20:59,121] {dag_processing.py:453} INFO - Launched 
DagFileProcessorManager with pid: 57333
[2019-03-09 17:20:59,122] {jobs.py:1548} DEBUG - Starting Loop...
[2019-03-09 17:20:59,122] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:20:59,123] {jobs.py:1595} DEBUG - Heartbeating the executor
[2019-03-09 17:20:59,123] {base_executor.py:118} DEBUG - 0 running task 
instances
[2019-03-09 17:20:59,123] {base_executor.py:119} DEBUG - 0 in queue
[2019-03-09 17:20:59,123] {base_executor.py:120} DEBUG - 32 open slots
[2019-03-09 17:20:59,124] {base_executor.py:149} DEBUG - Calling the  sync method
[2019-03-09 17:20:59,128] {jobs.py:1613} DEBUG - Ran scheduling loop in 0.01 
seconds
[2019-03-09 17:20:59,129] {jobs.py:1614} DEBUG - Sleeping for 1.00 seconds
[2019-03-09 17:20:59,130] {settings.py:51} INFO - Configured default timezone 

[2019-03-09 17:20:59,133] {logging_config.py:63} DEBUG - Unable to load custom 
logging, using default config instead
[2019-03-09 17:20:59,143] {settings.py:146} DEBUG - Setting up DB connection 
pool (PID 57333)
[2019-03-09 17:20:59,144] {settings.py:174} INFO - settings.configure_orm(): 
Using pool settings. pool_size=5, pool_recycle=1800, pid=57333
[2019-03-09 17:20:59,256] {settings.py:201} DEBUG - Disposing DB connection 
pool (PID 57334)
[2019-03-09 17:20:59,267] {settings.py:201} DEBUG - Disposing DB connection 
pool (PID 57337)
[2019-03-09 17:20:59,610] {settings.py:201} DEBUG - Disposing DB connection 
pool (PID 57336)
[2019-03-09 17:20:59,752] {settings.py:201} DEBUG - Disposing DB connection 
pool (PID 57335)
[2019-03-09 17:21:00,130] {jobs.py:1627} DEBUG - Sleeping for 0.99 seconds to 
prevent excessive logging
[2019-03-09 17:21:00,156] {jobs.py:489} DEBUG - Waiting for 

[2019-03-09 17:21:00,161] 

[jira] [Updated] (AIRFLOW-4056) Dag file processing does not respect dag_dir_list_interval

2019-03-09 Thread Daniel Standish (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish updated AIRFLOW-4056:
-
Description: 
The conf parameter {{dag_dir_list_interval}} seems to have no effect on dag 
directory scanning.

It seems to happen every 2 seconds, no matter what.  The default is supposed to 
be 5 minutes.

As  a result I see a scheduler output like this:
{code:java}
[2019-03-09 17:06:24,579] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:26,587] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:28,590] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:30,597] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:32,603] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:34,611] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:35,195] {sqlalchemy.py:79} WARNING - DB connection 
invalidated. Reconnecting...
[2019-03-09 17:06:36,615] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:38,623] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:40,631] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:42,637] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:44,644] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:46,205] {sqlalchemy.py:79} WARNING - DB connection 
invalidated. Reconnecting...
[2019-03-09 17:06:46,651] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:48,658] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:50,666] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:52,670] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:54,680] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:56,687] {jobs.py:1559} INFO - Harvesting DAG parsing 
results{code}
And no more is there the periodic printing of dag stats, like there was in 
1.10.1.

 I can confirm that this is happening by adding this to something in dag folder:
{code:java}
with open(Path('~/temp/test.log').expanduser(), 'at') as f:
f.write(f"{datetime.now()}: i am imported\n")
{code}

  was:
The conf parameter dag_dir_list_interval seems to have no effect on dag 
directory scanning.

It seems to happen every 2 seconds, no matter what.  The default is supposed to 
be 5 minutes.

As  a result I see a scheduler output like this:
{code:java}
[2019-03-09 17:06:24,579] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:26,587] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:28,590] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:30,597] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:32,603] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:34,611] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:35,195] {sqlalchemy.py:79} WARNING - DB connection 
invalidated. Reconnecting...
[2019-03-09 17:06:36,615] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:38,623] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:40,631] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:42,637] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:44,644] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:46,205] {sqlalchemy.py:79} WARNING - DB connection 
invalidated. Reconnecting...
[2019-03-09 17:06:46,651] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:48,658] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:50,666] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:52,670] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:54,680] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:56,687] {jobs.py:1559} INFO - Harvesting DAG parsing 
results{code}
And no more is there the periodic printing of dag stats, like there was in 
1.10.1.

 I can confirm that this is happening by adding this to something in dag folder:
{code:java}
with open(Path('~/temp/test.log').expanduser(), 'at') as f:
f.write(f"{datetime.now()}: i am imported\n")
{code}


> Dag file processing does not respect dag_dir_list_interval
> --
>
> Key: AIRFLOW-4056
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4056
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
>Affects Versions: 1.10.2
> Environment: I have confirmed this issue on mac and centos 
> environments, using mysql backend.
>Reporter: Daniel Standish
>Priority: Major
>
> The conf parameter 

[jira] [Created] (AIRFLOW-4056) Dag file processing does not respect dag_dir_list_interval

2019-03-09 Thread Daniel Standish (JIRA)
Daniel Standish created AIRFLOW-4056:


 Summary: Dag file processing does not respect dag_dir_list_interval
 Key: AIRFLOW-4056
 URL: https://issues.apache.org/jira/browse/AIRFLOW-4056
 Project: Apache Airflow
  Issue Type: Bug
  Components: scheduler
Affects Versions: 1.10.2
 Environment: I have confirmed this issue on mac and centos 
environments, using mysql backend.
Reporter: Daniel Standish


The conf parameter dag_dir_list_interval seems to have no effect on dag 
directory scanning.

It seems to happen every 2 seconds, no matter what.  The default is supposed to 
be 5 minutes.

As  a result I see a scheduler output like this:
{code:java}
[2019-03-09 17:06:24,579] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:26,587] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:28,590] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:30,597] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:32,603] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:34,611] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:35,195] {sqlalchemy.py:79} WARNING - DB connection 
invalidated. Reconnecting...
[2019-03-09 17:06:36,615] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:38,623] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:40,631] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:42,637] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:44,644] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:46,205] {sqlalchemy.py:79} WARNING - DB connection 
invalidated. Reconnecting...
[2019-03-09 17:06:46,651] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:48,658] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:50,666] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:52,670] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:54,680] {jobs.py:1559} INFO - Harvesting DAG parsing results
[2019-03-09 17:06:56,687] {jobs.py:1559} INFO - Harvesting DAG parsing 
results{code}
And no more is there the periodic printing of dag stats, like there was in 
1.10.1.

 I can confirm that this is happening by adding this to something in dag folder:
{code:java}
with open(Path('~/temp/test.log').expanduser(), 'at') as f:
f.write(f"{datetime.now()}: i am imported\n")
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (AIRFLOW-3901) Add optional role parameter to snowflake hook

2019-02-16 Thread Daniel Standish (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Standish updated AIRFLOW-3901:
-
Fix Version/s: 1.10.3

> Add optional role parameter to snowflake hook
> -
>
> Key: AIRFLOW-3901
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3901
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: contrib
>Reporter: Daniel Standish
>Assignee: Daniel Standish
>Priority: Trivial
> Fix For: 1.10.3
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> Role is a parameter missing from snowflake hook.
> I will add it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (AIRFLOW-3901) Add optional role parameter to snowflake hook

2019-02-16 Thread Daniel Standish (JIRA)
Daniel Standish created AIRFLOW-3901:


 Summary: Add optional role parameter to snowflake hook
 Key: AIRFLOW-3901
 URL: https://issues.apache.org/jira/browse/AIRFLOW-3901
 Project: Apache Airflow
  Issue Type: Improvement
  Components: contrib
Reporter: Daniel Standish
Assignee: Daniel Standish


Role is a parameter missing from snowflake hook.

I will add it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)