[jira] [Closed] (AIRFLOW-1354) www/views.py pool_link produces wrong link

2018-10-04 Thread Erik Cederstrand (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik Cederstrand closed AIRFLOW-1354.
-
Resolution: Not A Problem

I'm closing this because I think I just misunderstood what the link should do. 
The link will send you to the task instance list view, filtered to only show 
the task instances that are using this pool. This actually seems like 
reasonable behavior.

> www/views.py pool_link produces wrong link
> --
>
> Key: AIRFLOW-1354
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1354
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: webapp
>Affects Versions: 1.10.0
>Reporter: Erik Cederstrand
>Priority: Minor
>
> The pool_link() function in www/views.py which produces the link on the pool 
> names in the [/admin/pool/|http://green.nzcorp.net:7105/admin/pool/] list 
> view builds a link to
> {code:none}
> '/admin/taskinstance/?flt1_pool_equals=' + m.pool{code}
> but should return
> {code:none}
> '/admin/pool/?flt1_pool_equals=' + m.pool{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (AIRFLOW-1354) www/views.py pool_link produces wrong link

2018-10-03 Thread Erik Cederstrand (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik Cederstrand updated AIRFLOW-1354:
--
Affects Version/s: (was: 1.8.0)
   1.10.0

> www/views.py pool_link produces wrong link
> --
>
> Key: AIRFLOW-1354
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1354
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: webapp
>Affects Versions: 1.10.0
>Reporter: Erik Cederstrand
>Priority: Minor
>
> The pool_link() function in www/views.py which produces the link on the pool 
> names in the [/admin/pool/|http://green.nzcorp.net:7105/admin/pool/] list 
> view builds a link to
> {code:none}
> '/admin/taskinstance/?flt1_pool_equals=' + m.pool{code}
> but should return
> {code:none}
> '/admin/pool/?flt1_pool_equals=' + m.pool{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (AIRFLOW-1354) www/views.py pool_link produces wrong link

2018-10-03 Thread Erik Cederstrand (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik Cederstrand updated AIRFLOW-1354:
--
Description: 
The pool_link() function in www/views.py which produces the link on the pool 
names in the [/admin/pool/|http://green.nzcorp.net:7105/admin/pool/] list view 
builds a link to
{code:none}
'/admin/taskinstance/?flt1_pool_equals=' + m.pool{code}
but should return
{code:none}
'/admin/pool/?flt1_pool_equals=' + m.pool{code}

  was:
The pool_link() function in www/views.py builds a link to 
{code:none}'/admin/taskinstance/?flt1_pool_equals=' + m.pool{code}
but should probably return
{code:none}'/admin/pool/?flt1_pool_equals=' + m.pool{code}



> www/views.py pool_link produces wrong link
> --
>
> Key: AIRFLOW-1354
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1354
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: webapp
>Affects Versions: 1.8.0
>Reporter: Erik Cederstrand
>Priority: Minor
>
> The pool_link() function in www/views.py which produces the link on the pool 
> names in the [/admin/pool/|http://green.nzcorp.net:7105/admin/pool/] list 
> view builds a link to
> {code:none}
> '/admin/taskinstance/?flt1_pool_equals=' + m.pool{code}
> but should return
> {code:none}
> '/admin/pool/?flt1_pool_equals=' + m.pool{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (AIRFLOW-949) kill_process_tree does not kill the root process

2018-10-03 Thread Erik Cederstrand (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik Cederstrand closed AIRFLOW-949.

Resolution: Duplicate

> kill_process_tree does not kill the root process
> 
>
> Key: AIRFLOW-949
> URL: https://issues.apache.org/jira/browse/AIRFLOW-949
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: utils
>Affects Versions: 1.8.0rc4
>Reporter: Erik Cederstrand
>Priority: Major
>  Labels: patch
> Attachments: helpers.patch
>
>
> The kill_process_tree() function in airflow/utils/helper.py does not attempt 
> to kill the root process. Since there's also a kill_descendant_processes() 
> function, I assume that was the intent.
> Also, according to the comments, the intent is to send first SIGTERM, and 
> then SIGKILL, to decendant processes. But in fact, SIGTERM is sent twice.
> The attached patch fixes both problems.
> This was found while investigating why the airflow worker would not kill 
> certain jobs that had crashed. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-949) kill_process_tree does not kill the root process

2018-10-03 Thread Erik Cederstrand (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16636570#comment-16636570
 ] 

Erik Cederstrand commented on AIRFLOW-949:
--

Closing as this seems fixed by AIRFLOW-1109

> kill_process_tree does not kill the root process
> 
>
> Key: AIRFLOW-949
> URL: https://issues.apache.org/jira/browse/AIRFLOW-949
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: utils
>Affects Versions: 1.8.0rc4
>Reporter: Erik Cederstrand
>Priority: Major
>  Labels: patch
> Attachments: helpers.patch
>
>
> The kill_process_tree() function in airflow/utils/helper.py does not attempt 
> to kill the root process. Since there's also a kill_descendant_processes() 
> function, I assume that was the intent.
> Also, according to the comments, the intent is to send first SIGTERM, and 
> then SIGKILL, to decendant processes. But in fact, SIGTERM is sent twice.
> The attached patch fixes both problems.
> This was found while investigating why the airflow worker would not kill 
> certain jobs that had crashed. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-1229) Make "Run Id" column clickable in Browse -> DAG Runs

2017-11-23 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264101#comment-16264101
 ] 

Erik Cederstrand commented on AIRFLOW-1229:
---

Thanks for taking the time to get this in, [~abij]!

> Make "Run Id" column clickable in Browse -> DAG Runs
> 
>
> Key: AIRFLOW-1229
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1229
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: webapp
>Affects Versions: Airflow 1.8
> Environment: Python3.4
>Reporter: Erik Cederstrand
>Assignee: Alexander Bij
>  Labels: patch
> Attachments: dag_run_link.patch
>
>
> I'm triggering a lot of DAGs manually using "airflow trigger_dag my_dag 
> --run_id=some_unique_id". I would like to be able in the UI to browse easily 
> to this specific DAG run using the "some_unique_id" label. In the graph page 
> of the DAG, I need to know the exact execution date, which is inconvenient, 
> and in the Browse -> DAG Runs page I can search by "some_unique_id", but the 
> "Run Ids" column is not clickable.
> The attached patch makes the aforementioned column clickable, so I'm sent 
> directly to the graph view for that specific DAG run, not the DAG in general.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (AIRFLOW-1354) www/views.py pool_link produces wrong link

2017-08-02 Thread Erik Cederstrand (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik Cederstrand updated AIRFLOW-1354:
--
Description: 
The pool_link() function in www/views.py builds a link to 
{code:none}'/admin/taskinstance/?flt1_pool_equals=' + m.pool{code}
but should probably return
{code:none}'/admin/pool/?flt1_pool_equals=' + m.pool{code}


  was:
The dag_link() function in www/views.py builds a link to 
{code:none}'/admin/taskinstance/?flt1_pool_equals=' + m.pool{code}
but should probably return
{code:none}'/admin/pool/?flt1_pool_equals=' + m.pool{code}



> www/views.py pool_link produces wrong link
> --
>
> Key: AIRFLOW-1354
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1354
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: webapp
>Affects Versions: Airflow 1.8
>Reporter: Erik Cederstrand
>Priority: Minor
>
> The pool_link() function in www/views.py builds a link to 
> {code:none}'/admin/taskinstance/?flt1_pool_equals=' + m.pool{code}
> but should probably return
> {code:none}'/admin/pool/?flt1_pool_equals=' + m.pool{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (AIRFLOW-1229) Make "Run Id" column clickable in Browse -> DAG Runs

2017-05-23 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16021055#comment-16021055
 ] 

Erik Cederstrand commented on AIRFLOW-1229:
---

Updated patch to contain execution_date in the generated link. This lets the 
graph show the correct DAG run instead of the latest, if there are multiple DAG 
runs with the same run_id.

> Make "Run Id" column clickable in Browse -> DAG Runs
> 
>
> Key: AIRFLOW-1229
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1229
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: webapp
>Affects Versions: Airflow 1.8
> Environment: Python3.4
>Reporter: Erik Cederstrand
>  Labels: patch
> Attachments: dag_run_link.patch
>
>
> I'm triggering a lot of DAGs manually using "airflow trigger_dag my_dag 
> --run_id=some_unique_id". I would like to be able in the UI to browse easily 
> to this specific DAG run using the "some_unique_id" label. In the graph page 
> of the DAG, I need to know the exact execution date, which is inconvenient, 
> and in the Browse -> DAG Runs page I can search by "some_unique_id", but the 
> "Run Ids" column is not clickable.
> The attached patch makes the aforementioned column clickable, so I'm sent 
> directly to the graph view for that specific DAG run, not the DAG in general.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (AIRFLOW-1229) Make "Run Id" column clickable in Browse -> DAG Runs

2017-05-23 Thread Erik Cederstrand (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik Cederstrand updated AIRFLOW-1229:
--
Attachment: dag_run_link.patch

> Make "Run Id" column clickable in Browse -> DAG Runs
> 
>
> Key: AIRFLOW-1229
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1229
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: webapp
>Affects Versions: Airflow 1.8
> Environment: Python3.4
>Reporter: Erik Cederstrand
>  Labels: patch
> Attachments: dag_run_link.patch
>
>
> I'm triggering a lot of DAGs manually using "airflow trigger_dag my_dag 
> --run_id=some_unique_id". I would like to be able in the UI to browse easily 
> to this specific DAG run using the "some_unique_id" label. In the graph page 
> of the DAG, I need to know the exact execution date, which is inconvenient, 
> and in the Browse -> DAG Runs page I can search by "some_unique_id", but the 
> "Run Ids" column is not clickable.
> The attached patch makes the aforementioned column clickable, so I'm sent 
> directly to the graph view for that specific DAG run, not the DAG in general.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (AIRFLOW-1229) Make "Run Id" column clickable in Browse -> DAG Runs

2017-05-23 Thread Erik Cederstrand (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik Cederstrand updated AIRFLOW-1229:
--
Attachment: (was: dag_run_link.patch)

> Make "Run Id" column clickable in Browse -> DAG Runs
> 
>
> Key: AIRFLOW-1229
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1229
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: webapp
>Affects Versions: Airflow 1.8
> Environment: Python3.4
>Reporter: Erik Cederstrand
>  Labels: patch
> Attachments: dag_run_link.patch
>
>
> I'm triggering a lot of DAGs manually using "airflow trigger_dag my_dag 
> --run_id=some_unique_id". I would like to be able in the UI to browse easily 
> to this specific DAG run using the "some_unique_id" label. In the graph page 
> of the DAG, I need to know the exact execution date, which is inconvenient, 
> and in the Browse -> DAG Runs page I can search by "some_unique_id", but the 
> "Run Ids" column is not clickable.
> The attached patch makes the aforementioned column clickable, so I'm sent 
> directly to the graph view for that specific DAG run, not the DAG in general.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (AIRFLOW-1229) Make "Run Id" column clickable in Browse -> DAG Runs

2017-05-19 Thread Erik Cederstrand (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik Cederstrand updated AIRFLOW-1229:
--
Description: 
I'm triggering a lot of DAGs manually using "airflow trigger_dag my_dag 
--run_id=some_unique_id". I would like to be able in the UI to browse easily to 
this specific DAG run using the "some_unique_id" label. In the graph page of 
the DAG, I need to know the exact execution date, which is inconvenient, and in 
the Browse -> DAG Runs page I can search by "some_unique_id", but the "Run Ids" 
column is not clickable.

The attached patch makes the aforementioned column clickable, so I'm sent 
directly to the graph view for that specific DAG run, not the DAG in general.

  was:
I'm triggering a lot of DAGs manually using "airflow trigger_dag my_dag 
--run_id=some_unique_id". I would like to be able in the UI to browse easily to 
this specific DAG run using the "some_unique_id" label. In the graph page the 
the DAG, I need to know the execution date, and in the Browse -> DAG Runs page 
I can search by "some_unique_id", but the "Run Ids" column is not clickable.

The attached patch makes the aforementioned column clickable, so I'm sent 
directly to the graph view for that specific run, not the DAG in general.


> Make "Run Id" column clickable in Browse -> DAG Runs
> 
>
> Key: AIRFLOW-1229
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1229
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: webapp
>Affects Versions: Airflow 1.8
> Environment: Python3.4
>Reporter: Erik Cederstrand
>  Labels: patch
> Attachments: dag_run_link.patch
>
>
> I'm triggering a lot of DAGs manually using "airflow trigger_dag my_dag 
> --run_id=some_unique_id". I would like to be able in the UI to browse easily 
> to this specific DAG run using the "some_unique_id" label. In the graph page 
> of the DAG, I need to know the exact execution date, which is inconvenient, 
> and in the Browse -> DAG Runs page I can search by "some_unique_id", but the 
> "Run Ids" column is not clickable.
> The attached patch makes the aforementioned column clickable, so I'm sent 
> directly to the graph view for that specific DAG run, not the DAG in general.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (AIRFLOW-1229) Make "Run Id" column clickable in Browse -> DAG Runs

2017-05-19 Thread Erik Cederstrand (JIRA)
Erik Cederstrand created AIRFLOW-1229:
-

 Summary: Make "Run Id" column clickable in Browse -> DAG Runs
 Key: AIRFLOW-1229
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1229
 Project: Apache Airflow
  Issue Type: Improvement
  Components: webapp
Affects Versions: Airflow 1.8
 Environment: Python3.4
Reporter: Erik Cederstrand
 Attachments: dag_run_link.patch

I'm triggering a lot of DAGs manually using "airflow trigger_dag my_dag 
--run_id=some_unique_id". I would like to be able in the UI to browse easily to 
this specific DAG run using the "some_unique_id" label. In the graph page the 
the DAG, I need to know the execution date, and in the Browse -> DAG Runs page 
I can search by "some_unique_id", but the "Run Ids" column is not clickable.

The attached patch makes the aforementioned column clickable, so I'm sent 
directly to the graph view for that specific run, not the DAG in general.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (AIRFLOW-949) kill_process_tree does not kill the root process

2017-03-08 Thread Erik Cederstrand (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik Cederstrand updated AIRFLOW-949:
-
Attachment: helpers.patch

Updated patch with better log messages

> kill_process_tree does not kill the root process
> 
>
> Key: AIRFLOW-949
> URL: https://issues.apache.org/jira/browse/AIRFLOW-949
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: utils
>Affects Versions: 1.8.0rc4
>Reporter: Erik Cederstrand
>  Labels: patch
> Attachments: helpers.patch
>
>
> The kill_process_tree() function in airflow/utils/helper.py does not attempt 
> to kill the root process. Since there's also a kill_descendant_processes() 
> function, I assume that was the intent.
> Also, according to the comments, the intent is to send first SIGTERM, and 
> then SIGKILL, to decendant processes. But in fact, SIGTERM is sent twice.
> The attached patch fixes both problems.
> This was found while investigating why the airflow worker would not kill 
> certain jobs that had crashed. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (AIRFLOW-949) kill_process_tree does not kill the root process

2017-03-07 Thread Erik Cederstrand (JIRA)
Erik Cederstrand created AIRFLOW-949:


 Summary: kill_process_tree does not kill the root process
 Key: AIRFLOW-949
 URL: https://issues.apache.org/jira/browse/AIRFLOW-949
 Project: Apache Airflow
  Issue Type: Bug
  Components: utils
Affects Versions: 1.8.0rc4
Reporter: Erik Cederstrand
 Attachments: helpers.patch

The kill_process_tree() function in airflow/utils/helper.py does not attempt to 
kill the root process. Since there's also a kill_descendant_processes() 
function, I assume that was the intent.

Also, according to the comments, the intent is to send first SIGTERM, and then 
SIGKILL, to decendant processes. But in fact, SIGTERM is sent twice.

The attached patch fixes both problems.

This was found while investigating why the airflow worker would not kill 
certain jobs that had crashed. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-27 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15869870#comment-15869870
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/27/17 12:38 PM:


Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less 
frequently. Previously, the scheduler would die every ~10 seconds, now it can 
live for some minutes. Here's a modified patch to simply ignore 
{{ConnectionResetError}}, which has been running for ~1 hour now:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))

-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+try: self.executor.heartbeat()
+except ConnectionResetError: pass  # RabbitMQ sometimes resets 
the socket connection
+last_executor_heartbeat_time = datetime.now()

 # Process events from the executor
 self._process_executor_events()
{code}

As a comment to the justifiability of this patch, our scheduler in production 
often dies so early in the scheduling process that jobs are never progressed, 
leaving jobs in the celery queue indefinitely and celery workers idling. Thus, 
wrapping the scheduler in a {{while True}} loop as suggested elsewhere does 
nothing for us.


was (Author: erikcederstrand):
Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less 
frequently. Preciously, the scheduler would die every ~10 seconds, now it can 
live for some minutes. Here's a modified patch to simply ignore 
{{ConnectionResetError}}, which has been running for ~1 hour now:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))

-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+try: self.executor.heartbeat()
+except ConnectionResetError: pass  # RabbitMQ sometimes resets 
the socket connection
+last_executor_heartbeat_time = datetime.now()

 # Process events from the executor
 self._process_executor_events()
{code}

As a comment to the justifiability of this patch, our scheduler in production 
often dies so early in the scheduling process that jobs are never progressed, 
leaving jobs in the celery queue indefinitely and celery workers idling. Thus, 
wrapping the scheduler in a {{while True}} loop as suggested elsewhere does 
nothing for us.

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: 

[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15869870#comment-15869870
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 1:29 PM:
---

Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less 
frequently. Preciously, the scheduler would die every ~10 seconds, now it can 
live for some minutes. Here's a modified patch to simply ignore 
{{ConnectionResetError}}, which has been running for ~1 hour now:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))

-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+try: self.executor.heartbeat()
+except ConnectionResetError: pass  # RabbitMQ sometimes resets 
the socket connection
+last_executor_heartbeat_time = datetime.now()

 # Process events from the executor
 self._process_executor_events()
{code}

As a comment to the justifiability of this patch, our scheduler in production 
often dies so early in the scheduling process that jobs are never progressed, 
leaving jobs in the celery queue indefinitely and celery workers idling. Thus, 
wrapping the scheduler in a {{while True}} loop as suggested elsewhere does 
nothing for us.


was (Author: erikcederstrand):
Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less 
frequently. Preciously, the scheduler would die every ~10 seconds, now it can 
live for some minutes. Here's a modified patch to simply ignore 
{{ConnectionResetError}}, which has been running for ~1 hour now:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))

-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+try: self.executor.heartbeat()
+except ConnectionResetError: pass  # RabbitMQ sometimes resets 
the socket connection
+last_executor_heartbeat_time = datetime.now()

 # Process events from the executor
 self._process_executor_events()
{code}

As acomment to the justifiability of this patch, our scheduler in production 
often dies so early in the scheduling process that jobs are never progressed. 
Thus, wrapping the scheduler in a {{while True}} loop as suggested elsewhere 
does nothing for us.

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: 

[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15869870#comment-15869870
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 1:28 PM:
---

Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less 
frequently. Preciously, the scheduler would die every ~10 seconds, now it can 
live for some minutes. Here's a modified patch to simply ignore 
{{ConnectionResetError}}, which has been running for ~1 hour now:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))

-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+try: self.executor.heartbeat()
+except ConnectionResetError: pass  # RabbitMQ sometimes resets 
the socket connection
+last_executor_heartbeat_time = datetime.now()

 # Process events from the executor
 self._process_executor_events()
{code}

As acomment to the justifiability of this patch, our scheduler in production 
often dies so early in the scheduling process that jobs are never progressed. 
Thus, wrapping the scheduler in a {{while True}} loop as suggested elsewhere 
does nothing for us.


was (Author: erikcederstrand):
Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less 
frequently. Preciously, the scheduler would die every ~10 seconds, now it can 
live for some minutes. Here's a modified patch to simply ignore 
{{ConnectionResetError}}, which has been running for ~1 hour now:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))

-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+try: self.executor.heartbeat()
+except ConnectionResetError: pass  # RabbitMQ sometimes resets 
the socket connection
+last_executor_heartbeat_time = datetime.now()

 # Process events from the executor
 self._process_executor_events()
{code}

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} 

[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15869870#comment-15869870
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 12:54 PM:


Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less 
frequently. Preciously, the scheduler would die every ~10 seconds, now it can 
live for some minutes. Here's a modified patch to simply ignore 
{{ConnectionResetError}}, which has been running for ~1 hour now:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))

-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+try: self.executor.heartbeat()
+except ConnectionResetError: pass  # RabbitMQ sometimes resets 
the socket connection
+last_executor_heartbeat_time = datetime.now()

 # Process events from the executor
 self._process_executor_events()
{code}


was (Author: erikcederstrand):
Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less 
frequently. Preciously, the scheduler would die ever ~10 seconds, now it can 
live for some minutes. Here's a modified patch to simply ignore 
{{ConnectionResetError}}:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))

-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+try: self.executor.heartbeat()
+except ConnectionResetError: pass  # RabbitMQ sometimes resets 
the socket connection
+last_executor_heartbeat_time = datetime.now()

 # Process events from the executor
 self._process_executor_events()
{code}

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
> executor.heartbeat()
>   File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", 
> line 107, in 

[jira] [Commented] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15869870#comment-15869870
 ] 

Erik Cederstrand commented on AIRFLOW-342:
--

Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less 
frequently. Preciously, the scheduler would die ever ~10 seconds, now it can 
live for some minutes. Here's a modified patch to simply ignore 
{{ConnectionResetError}}:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))

-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+try: self.executor.heartbeat()
+except ConnectionResetError: pass  # RabbitMQ sometimes resets 
the socket connection
+last_executor_heartbeat_time = datetime.now()

 # Process events from the executor
 self._process_executor_events()
{code}

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
> executor.heartbeat()
>   File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", 
> line 107, in heartbeat
> self.sync()
>   File 
> "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 
> 74, in sync
> state = async.state
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state
> return self._get_task_meta()['status']
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in 
> _get_task_meta
> return self._maybe_set_cache(self.backend.get_task_meta(self.id))
>   File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, 
> in get_task_meta
> binding.declare()
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in 
> declare
>self.exchange.declare(nowait)
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in 
> declare
> nowait=nowait, passive=passive,
>   File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in 
> exchange_declare
> self._send_method((40, 10), args)
>   File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, 
> in _send_method
> self.channel_id, method_sig, args, content,
>   File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, 
> in write_method
> write_frame(1, channel, payload)
>   File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in 
> write_frame
> frame_type, channel, size, payload, 0xce,
>   File "/usr/lib64/python2.7/socket.py", line 224, in meth
> return getattr(self._sock,name)(*args)
> error: [Errno 104] Connection reset by peer
> [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia!



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15869793#comment-15869793
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 12:08 PM:


I think I found the culprit. The scheduler is not careful to rate-limit 
hearbeats to the executor, and if they happen too often, then RabbitMQ will 
close the connection. Here's a patch that fixes the exception for me:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()
 
@@ -1436,9 +1438,13 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))
 
-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+self.executor.heartbeat()
+last_executor_heartbeat_time = datetime.now()
 
 # Process events from the executor
 self._process_executor_events()
{code}

I still think the scheduler should survive {{ConnectionClosed}} exceptions from 
an executor hearbeat as they still could occur, but I'll leave the patch as-is 
to thow the minimal change required.


was (Author: erikcederstrand):
I think I found the culprit. The scheduler is not careful to rate-limit 
hearbeats to the executor, and if they happen too often, then RabbitMQ will 
close the connection. Here's a patch that fixes the exception for me:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:41:10.0 +
+++ jobs.py 2017-02-16 11:40:28.638116325 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()
 
@@ -1436,9 +1438,13 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))
 
-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+self.executor.heartbeat()
+last_executor_heartbeat_time = datetime.now()
 
 # Process events from the executor
 self._process_executor_events()
{code}

I still think the scheduler should survive {{ConnectionClosed}} exceptions from 
an executor hearbeat as they still could occur, but I'll leave the patch as-is 
to thow the minimal change required.

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
> executor.heartbeat()
>   File 

[jira] [Commented] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15869793#comment-15869793
 ] 

Erik Cederstrand commented on AIRFLOW-342:
--

I think I found the culprit. The scheduler is not careful to rate-limit 
hearbeats to the executor, and if they happen too often, then RabbitMQ will 
close the connection. Here's a patch that fixes the exception for me:

{{code}}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:41:10.0 +
+++ jobs.py 2017-02-16 11:40:28.638116325 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()
 
@@ -1436,9 +1438,13 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))
 
-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+self.executor.heartbeat()
+last_executor_heartbeat_time = datetime.now()
 
 # Process events from the executor
 self._process_executor_events()
{{code}}

I still think the scheduler should survive {{ConnectionClosed}} exceptions from 
an executor hearbeat as they still could occur, but I'll leave the patch as-is 
to thow the minimal change required.

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
> executor.heartbeat()
>   File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", 
> line 107, in heartbeat
> self.sync()
>   File 
> "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 
> 74, in sync
> state = async.state
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state
> return self._get_task_meta()['status']
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in 
> _get_task_meta
> return self._maybe_set_cache(self.backend.get_task_meta(self.id))
>   File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, 
> in get_task_meta
> binding.declare()
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in 
> declare
>self.exchange.declare(nowait)
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in 
> declare
> nowait=nowait, passive=passive,
>   File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in 
> exchange_declare
> self._send_method((40, 10), args)
>   File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, 
> in _send_method
> self.channel_id, method_sig, args, content,
>   File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, 
> in write_method
> write_frame(1, channel, payload)
>   File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in 
> write_frame
> frame_type, channel, size, payload, 0xce,
>   File "/usr/lib64/python2.7/socket.py", line 224, in meth
> return getattr(self._sock,name)(*args)
> error: [Errno 104] Connection reset by peer
> [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia!



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15869598#comment-15869598
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 10:35 AM:


This may be a Celery issue when the worker starts up when there are already 
messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the 
exact same stack trace in https://github.com/celery/celery/issues/3773

This was with celery 4.0.2. Downgrading to 3.1.23 did not help.

Interestingly, "airflow scheduler" only crashes if there are tasks in "STARTED" 
state when it starts. If all tasks are "SUCCESS", the scheduler does not crash.


was (Author: erikcederstrand):
This may be a Celery issue when the worker starts up when there are already 
messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the 
exact same stack trace in https://github.com/celery/celery/issues/3773

This was with celery 4.0.2. Downgrading to 3.1.23 did not help.

Interestingly, {airflow scheduler} only crashes if there are tasks in "STARTED" 
state when it starts. If all tasks are "SUCCESS", the scheduler does not crash.

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
> executor.heartbeat()
>   File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", 
> line 107, in heartbeat
> self.sync()
>   File 
> "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 
> 74, in sync
> state = async.state
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state
> return self._get_task_meta()['status']
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in 
> _get_task_meta
> return self._maybe_set_cache(self.backend.get_task_meta(self.id))
>   File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, 
> in get_task_meta
> binding.declare()
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in 
> declare
>self.exchange.declare(nowait)
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in 
> declare
> nowait=nowait, passive=passive,
>   File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in 
> exchange_declare
> self._send_method((40, 10), args)
>   File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, 
> in _send_method
> self.channel_id, method_sig, args, content,
>   File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, 
> in write_method
> write_frame(1, channel, payload)
>   File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in 
> write_frame
> frame_type, channel, size, payload, 0xce,
>   File "/usr/lib64/python2.7/socket.py", line 224, in meth
> return getattr(self._sock,name)(*args)
> error: [Errno 104] Connection reset by peer
> [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia!



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15869598#comment-15869598
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 10:35 AM:


This may be a Celery issue when the worker starts up when there are already 
messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the 
exact same stack trace in https://github.com/celery/celery/issues/3773

This was with celery 4.0.2. Downgrading to 3.1.23 did not help.

Interestingly, {airflow scheduler} only crashes if there are tasks in "STARTED" 
state when it starts. If all tasks are "SUCCESS", the scheduler does not crash.


was (Author: erikcederstrand):
This may be a Celery issue when the worker starts up when there are already 
messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the 
exact same stack trace in https://github.com/celery/celery/issues/3773

This was with celery 4.0.2. Downgrading to 3.1.23 did not help.

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
> executor.heartbeat()
>   File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", 
> line 107, in heartbeat
> self.sync()
>   File 
> "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 
> 74, in sync
> state = async.state
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state
> return self._get_task_meta()['status']
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in 
> _get_task_meta
> return self._maybe_set_cache(self.backend.get_task_meta(self.id))
>   File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, 
> in get_task_meta
> binding.declare()
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in 
> declare
>self.exchange.declare(nowait)
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in 
> declare
> nowait=nowait, passive=passive,
>   File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in 
> exchange_declare
> self._send_method((40, 10), args)
>   File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, 
> in _send_method
> self.channel_id, method_sig, args, content,
>   File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, 
> in write_method
> write_frame(1, channel, payload)
>   File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in 
> write_frame
> frame_type, channel, size, payload, 0xce,
>   File "/usr/lib64/python2.7/socket.py", line 224, in meth
> return getattr(self._sock,name)(*args)
> error: [Errno 104] Connection reset by peer
> [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia!



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15869598#comment-15869598
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 9:40 AM:
---

This may be a Celery issue when the worker starts up when there are already 
messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the 
exact same stack trace in https://github.com/celery/celery/issues/3773

This was with celery 4.0.2. Downgrading to 3.1.23 did not help.


was (Author: erikcederstrand):
This may be a Celery issue when the worker starts up when there are already 
messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the 
exact same stack trace in https://github.com/celery/celery/issues/3773

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
> executor.heartbeat()
>   File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", 
> line 107, in heartbeat
> self.sync()
>   File 
> "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 
> 74, in sync
> state = async.state
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state
> return self._get_task_meta()['status']
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in 
> _get_task_meta
> return self._maybe_set_cache(self.backend.get_task_meta(self.id))
>   File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, 
> in get_task_meta
> binding.declare()
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in 
> declare
>self.exchange.declare(nowait)
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in 
> declare
> nowait=nowait, passive=passive,
>   File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in 
> exchange_declare
> self._send_method((40, 10), args)
>   File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, 
> in _send_method
> self.channel_id, method_sig, args, content,
>   File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, 
> in write_method
> write_frame(1, channel, payload)
>   File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in 
> write_frame
> frame_type, channel, size, payload, 0xce,
>   File "/usr/lib64/python2.7/socket.py", line 224, in meth
> return getattr(self._sock,name)(*args)
> error: [Errno 104] Connection reset by peer
> [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia!



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15869598#comment-15869598
 ] 

Erik Cederstrand commented on AIRFLOW-342:
--

This may be a Celery issue when the worker starts up when there are already 
messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the 
exact same stack trace in https://github.com/celery/celery/issues/3773

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
> executor.heartbeat()
>   File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", 
> line 107, in heartbeat
> self.sync()
>   File 
> "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 
> 74, in sync
> state = async.state
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state
> return self._get_task_meta()['status']
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in 
> _get_task_meta
> return self._maybe_set_cache(self.backend.get_task_meta(self.id))
>   File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, 
> in get_task_meta
> binding.declare()
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in 
> declare
>self.exchange.declare(nowait)
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in 
> declare
> nowait=nowait, passive=passive,
>   File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in 
> exchange_declare
> self._send_method((40, 10), args)
>   File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, 
> in _send_method
> self.channel_id, method_sig, args, content,
>   File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, 
> in write_method
> write_frame(1, channel, payload)
>   File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in 
> write_frame
> frame_type, channel, size, payload, 0xce,
>   File "/usr/lib64/python2.7/socket.py", line 224, in meth
> return getattr(self._sock,name)(*args)
> error: [Errno 104] Connection reset by peer
> [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia!



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-15 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15868309#comment-15868309
 ] 

Erik Cederstrand commented on AIRFLOW-342:
--

I'm also running into this issue, but in 1.8.0-rc3, installed from GitHub. I 
have installed Airflow with Python3, RabbitMQ and Celery in a Docker setup, and 
start with one very simple DAG:

{code}
from datetime import datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args = {
'owner': 'airflow',
'depends_on_past': False,
"start_date": datetime(2017, 2, 12),
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
}

def test_callable(msg, *args, **kwargs):
print(msg)

dag = DAG('test_dag', default_args=default_args, schedule_interval='@daily')
test_task = PythonOperator(
task_id='test_task', 
python_callable=test_callable,
op_kwargs={
'msg': 'Hello Airflow',
},
provide_context=True,
dag=dag, 
retries=0
)
{code}

When I start up the scheduler with {airflow scheduler}, it crashes with this 
output:
{code}
$ airflow scheduler
[2017-02-15 17:56:12,397] {__init__.py:57} INFO - Using executor CeleryExecutor
[2017-02-15 17:56:12,465] {driver.py:120} INFO - Generating grammar tables from 
/usr/lib/python3.4/lib2to3/Grammar.txt
[2017-02-15 17:56:12,482] {driver.py:120} INFO - Generating grammar tables from 
/usr/lib/python3.4/lib2to3/PatternGrammar.txt
     _
 |__( )_  __/__  /  __
  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_//_//_/  \//|__/
 
[2017-02-15 17:56:12,708] {jobs.py:1262} INFO - Starting the scheduler
[2017-02-15 17:56:12,708] {jobs.py:1278} INFO - Processing files using up to 2 
processes at a time 
[2017-02-15 17:56:12,708] {jobs.py:1280} INFO - Running execute loop for -1 
seconds
[2017-02-15 17:56:12,708] {jobs.py:1282} INFO - Processing each file at most -1 
times
[2017-02-15 17:56:12,708] {jobs.py:1284} INFO - Process each file at most once 
every 0 seconds
[2017-02-15 17:56:12,708] {jobs.py:1286} INFO - Checking for new files in 
/opt/bbd/src/workflows/dags every 300 seconds
[2017-02-15 17:56:12,708] {jobs.py:1289} INFO - Searching for files in 
/opt/bbd/src/workflows/dags
[2017-02-15 17:56:12,709] {jobs.py:1292} INFO - There are 1 files in 
/opt/bbd/src/workflows/dags
[2017-02-15 17:56:12,709] {jobs.py:1354} INFO - Resetting state for orphaned 
tasks
[2017-02-15 17:56:12,712] {jobs.py:1363} INFO - Resetting test_dag 2017-02-13 
00:00:00
[2017-02-15 17:56:12,718] {jobs.py:1404} INFO - Heartbeating the process manager
[2017-02-15 17:56:12,731] {dag_processing.py:627} INFO - Started a process 
(PID: 35) to generate tasks for /opt/bbd/src/workflows/dags/test_dag.py - 
logging into /opt/bbd/src/workflows/logs/scheduler/2017-02-15/test_dag.py.log
[2017-02-15 17:56:12,731] {jobs.py:1440} INFO - Heartbeating the executor
[2017-02-15 17:56:12,732] {jobs.py:1237} INFO - 

DAG File Processing Stats

File Path  PID  RuntimeLast RuntimeLast 
Run
---  -  -  --  
--
/opt/bbd/src/workflows/dags/test_dag.py 35  0.00s

[2017-02-15 17:56:13,733] {jobs.py:1404} INFO - Heartbeating the process manager
[2017-02-15 17:56:13,734] {dag_processing.py:559} INFO - Processor for 
/opt/bbd/src/workflows/dags/test_dag.py finished
[2017-02-15 17:56:13,738] {dag_processing.py:627} INFO - Started a process 
(PID: 37) to generate tasks for /opt/bbd/src/workflows/dags/test_dag.py - 
logging into /opt/bbd/src/workflows/logs/scheduler/2017-02-15/test_dag.py.log
[2017-02-15 17:56:13,774] {jobs.py:985} INFO - Tasks up for execution:

[2017-02-15 17:56:13,779] {jobs.py:1008} INFO - Figuring out tasks to run in 
Pool(name=None) with 128 open slots and 1 task instances in queue
[2017-02-15 17:56:13,787] {jobs.py:1056} INFO - DAG test_dag has 0/2 running 
tasks
[2017-02-15 17:56:13,788] {jobs.py:1083} INFO - Sending to executor 
('test_dag', 'test_task', datetime.datetime(2017, 2, 14, 0, 0)) with priority 1 
and queue default
[2017-02-15 17:56:13,791] {jobs.py:1094} INFO - Setting state of ('test_dag', 
'test_task', datetime.datetime(2017, 2, 14, 0, 0)) to queued
[2017-02-15 17:56:13,806] {base_executor.py:50} INFO - Adding to queue: airflow 
run test_dag test_task 2017-02-14T00:00:00 --local -sd 
/opt/bbd/src/workflows/dags/test_dag.py
[2017-02-15 17:56:13,807] {jobs.py:1440} INFO - Heartbeating the executor
[2017-02-15 17:56:13,812] {celery_executor.py:78} INFO - [celery] queuing 
('test_dag', 'test_task', datetime.datetime(2017, 2, 14, 0, 0)) through 

[jira] [Updated] (AIRFLOW-840) Python3 encoding issue in Kerberos

2017-02-06 Thread Erik Cederstrand (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik Cederstrand updated AIRFLOW-840:
-
Description: 
While attempting to configure Kerberos ticket renewal in a Python3 environment, 
I encountered this encoding issue trying to run {{airflow kerberos}}:

{code:none}
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 15, in 
args.func(args)
  File "/usr/local/lib/python3.4/dist-packages/airflow/bin/cli.py", line 600, 
in kerberos
airflow.security.kerberos.run()
  File "/usr/local/lib/python3.4/dist-packages/airflow/security/kerberos.py", 
line 110, in run
renew_from_kt()
  File "/usr/local/lib/python3.4/dist-packages/airflow/security/kerberos.py", 
line 55, in renew_from_kt
"\n".join(subp.stderr.readlines(
TypeError: sequence item 0: expected str instance, bytes found
{code}

The issue here (ignoring for a moment why {{kinit}} is failing on my machine) 
is that Popen in Python3 returns {{bytes}} for stdin/stdout, but both are 
handled as if they are {{str}}.

I'm unsure what the Py2/3 compat policy is at Airflow, but a simple {{from six 
import PY2}} and an if/else seems like the least intrusive fix. The non-PY2 
path would then add something like 
{{subp.stdin.readlines().decode(errors='ignore')}}

  was:
While attempting to configure Kerberos ticket renewal in a Python3 environment, 
I encountered this encoding issue trying to run {{airflow kerberos}}:

{code:python}
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 15, in 
args.func(args)
  File "/usr/local/lib/python3.4/dist-packages/airflow/bin/cli.py", line 600, 
in kerberos
airflow.security.kerberos.run()
  File "/usr/local/lib/python3.4/dist-packages/airflow/security/kerberos.py", 
line 110, in run
renew_from_kt()
  File "/usr/local/lib/python3.4/dist-packages/airflow/security/kerberos.py", 
line 55, in renew_from_kt
"\n".join(subp.stderr.readlines(
TypeError: sequence item 0: expected str instance, bytes found
{code}

The issue here (ignoring for a moment why {{kinit}} is failing on my machine) 
is that Popen in Python3 returns {{bytes}} for stdin/stdout, but both are 
handled as if they are {{str}}.

I'm unsure what the Py2/3 compat policy is at Airflow, but a simple {{from six 
import PY2}} and an if/else seems like the least intrusive fix. The non-PY2 
path would then add something like 
{{subp.stdin.readlines().decode(errors='ignore')}}


> Python3 encoding issue in Kerberos
> --
>
> Key: AIRFLOW-840
> URL: https://issues.apache.org/jira/browse/AIRFLOW-840
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: security
>Affects Versions: Airflow 1.8
> Environment: $ python --version
> Python 3.4.3
>Reporter: Erik Cederstrand
>  Labels: security
>
> While attempting to configure Kerberos ticket renewal in a Python3 
> environment, I encountered this encoding issue trying to run {{airflow 
> kerberos}}:
> {code:none}
> Traceback (most recent call last):
>   File "/usr/local/bin/airflow", line 15, in 
> args.func(args)
>   File "/usr/local/lib/python3.4/dist-packages/airflow/bin/cli.py", line 600, 
> in kerberos
> airflow.security.kerberos.run()
>   File "/usr/local/lib/python3.4/dist-packages/airflow/security/kerberos.py", 
> line 110, in run
> renew_from_kt()
>   File "/usr/local/lib/python3.4/dist-packages/airflow/security/kerberos.py", 
> line 55, in renew_from_kt
> "\n".join(subp.stderr.readlines(
> TypeError: sequence item 0: expected str instance, bytes found
> {code}
> The issue here (ignoring for a moment why {{kinit}} is failing on my machine) 
> is that Popen in Python3 returns {{bytes}} for stdin/stdout, but both are 
> handled as if they are {{str}}.
> I'm unsure what the Py2/3 compat policy is at Airflow, but a simple {{from 
> six import PY2}} and an if/else seems like the least intrusive fix. The 
> non-PY2 path would then add something like 
> {{subp.stdin.readlines().decode(errors='ignore')}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (AIRFLOW-840) Python3 encoding issue in Kerberos

2017-02-06 Thread Erik Cederstrand (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik Cederstrand updated AIRFLOW-840:
-
Description: 
While attempting to configure Kerberos ticket renewal in a Python3 environment, 
I encountered this encoding issue trying to run {{airflow kerberos}}:

{code:python}
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 15, in 
args.func(args)
  File "/usr/local/lib/python3.4/dist-packages/airflow/bin/cli.py", line 600, 
in kerberos
airflow.security.kerberos.run()
  File "/usr/local/lib/python3.4/dist-packages/airflow/security/kerberos.py", 
line 110, in run
renew_from_kt()
  File "/usr/local/lib/python3.4/dist-packages/airflow/security/kerberos.py", 
line 55, in renew_from_kt
"\n".join(subp.stderr.readlines(
TypeError: sequence item 0: expected str instance, bytes found
{code}

The issue here (ignoring for a moment why {{kinit}} is failing on my machine) 
is that Popen in Python3 returns {{bytes}} for stdin/stdout, but both are 
handled as if they are {{str}}.

I'm unsure what the Py2/3 compat policy is at Airflow, but a simple {{from six 
import PY2}} and an if/else seems like the least intrusive fix. The non-PY2 
path would then add something like 
{{subp.stdin.readlines().decode(errors='ignore')}}

  was:
While attempting to configure Kerberos ticket renewal in a Python3 environment, 
I encountered this encoding issue trying to run {{airflow kerberos}}:

{quote}
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 15, in 
args.func(args)
  File "/usr/local/lib/python3.4/dist-packages/airflow/bin/cli.py", line 600, 
in kerberos
airflow.security.kerberos.run()
  File "/usr/local/lib/python3.4/dist-packages/airflow/security/kerberos.py", 
line 110, in run
renew_from_kt()
  File "/usr/local/lib/python3.4/dist-packages/airflow/security/kerberos.py", 
line 55, in renew_from_kt
"\n".join(subp.stderr.readlines(
TypeError: sequence item 0: expected str instance, bytes found
{quote}

The issue here (ignoring for a moment why {{kinit}} is failing on my machine) 
is that Popen in Python3 returns {{bytes}} for stdin/stdout, but both are 
handled as if they are {{str}}.

I'm unsure what the Py2/3 compat policy is at Airflow, but a simple {{from six 
import PY2}} and an if/else seems like the least intrusive fix. The non-PY2 
path would then add something like 
{{subp.stdin.readlines().decode(errors='ignore')}}


> Python3 encoding issue in Kerberos
> --
>
> Key: AIRFLOW-840
> URL: https://issues.apache.org/jira/browse/AIRFLOW-840
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: security
>Affects Versions: Airflow 1.8
> Environment: $ python --version
> Python 3.4.3
>Reporter: Erik Cederstrand
>  Labels: security
>
> While attempting to configure Kerberos ticket renewal in a Python3 
> environment, I encountered this encoding issue trying to run {{airflow 
> kerberos}}:
> {code:python}
> Traceback (most recent call last):
>   File "/usr/local/bin/airflow", line 15, in 
> args.func(args)
>   File "/usr/local/lib/python3.4/dist-packages/airflow/bin/cli.py", line 600, 
> in kerberos
> airflow.security.kerberos.run()
>   File "/usr/local/lib/python3.4/dist-packages/airflow/security/kerberos.py", 
> line 110, in run
> renew_from_kt()
>   File "/usr/local/lib/python3.4/dist-packages/airflow/security/kerberos.py", 
> line 55, in renew_from_kt
> "\n".join(subp.stderr.readlines(
> TypeError: sequence item 0: expected str instance, bytes found
> {code}
> The issue here (ignoring for a moment why {{kinit}} is failing on my machine) 
> is that Popen in Python3 returns {{bytes}} for stdin/stdout, but both are 
> handled as if they are {{str}}.
> I'm unsure what the Py2/3 compat policy is at Airflow, but a simple {{from 
> six import PY2}} and an if/else seems like the least intrusive fix. The 
> non-PY2 path would then add something like 
> {{subp.stdin.readlines().decode(errors='ignore')}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (AIRFLOW-840) Python3 encoding issue in Kerberos

2017-02-06 Thread Erik Cederstrand (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik Cederstrand updated AIRFLOW-840:
-
Description: 
While attempting to configure Kerberos ticket renewal in a Python3 environment, 
I encountered this encoding issue trying to run {{airflow kerberos}}:

{quote}
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 15, in 
args.func(args)
  File "/usr/local/lib/python3.4/dist-packages/airflow/bin/cli.py", line 600, 
in kerberos
airflow.security.kerberos.run()
  File "/usr/local/lib/python3.4/dist-packages/airflow/security/kerberos.py", 
line 110, in run
renew_from_kt()
  File "/usr/local/lib/python3.4/dist-packages/airflow/security/kerberos.py", 
line 55, in renew_from_kt
"\n".join(subp.stderr.readlines(
TypeError: sequence item 0: expected str instance, bytes found
{quote}

The issue here (ignoring for a moment why {{kinit}} is failing on my machine) 
is that Popen in Python3 returns {{bytes}} for stdin/stdout, but both are 
handled as if they are {{str}}.

I'm unsure what the Py2/3 compat policy is at Airflow, but a simple {{from six 
import PY2}} and an if/else seems like the least intrusive fix. The non-PY2 
path would then add something like 
{{subp.stdin.readlines().decode(errors='ignore')}}

  was:
While attempting to configure Kerberos ticket renewal in a Python3 environment, 
I encountered this encoding issue trying to run {{airflow kerberos}}:

{{
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 15, in 
args.func(args)
  File "/usr/local/lib/python3.4/dist-packages/airflow/bin/cli.py", line 600, 
in kerberos
airflow.security.kerberos.run()
  File "/usr/local/lib/python3.4/dist-packages/airflow/security/kerberos.py", 
line 110, in run
renew_from_kt()
  File "/usr/local/lib/python3.4/dist-packages/airflow/security/kerberos.py", 
line 55, in renew_from_kt
"\n".join(subp.stderr.readlines(
TypeError: sequence item 0: expected str instance, bytes found
}}

The issue here (ignoring for a moment why {{kinit}} is failing on my machine) 
is that Popen in Python3 returns {{bytes}} for stdin/stdout, but both are 
handled as if they are {{str}}.

I'm unsure what the Py2/3 compat policy is at Airflow, but a simple {{from six 
import PY2}} and an if/else seems like the least intrusive fix. The non-PY2 
path would then add something like 
{{subp.stdin.readlines().decode(errors='ignore')}}


> Python3 encoding issue in Kerberos
> --
>
> Key: AIRFLOW-840
> URL: https://issues.apache.org/jira/browse/AIRFLOW-840
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: security
>Affects Versions: Airflow 1.8
> Environment: $ python --version
> Python 3.4.3
>Reporter: Erik Cederstrand
>  Labels: security
>
> While attempting to configure Kerberos ticket renewal in a Python3 
> environment, I encountered this encoding issue trying to run {{airflow 
> kerberos}}:
> {quote}
> Traceback (most recent call last):
>   File "/usr/local/bin/airflow", line 15, in 
> args.func(args)
>   File "/usr/local/lib/python3.4/dist-packages/airflow/bin/cli.py", line 600, 
> in kerberos
> airflow.security.kerberos.run()
>   File "/usr/local/lib/python3.4/dist-packages/airflow/security/kerberos.py", 
> line 110, in run
> renew_from_kt()
>   File "/usr/local/lib/python3.4/dist-packages/airflow/security/kerberos.py", 
> line 55, in renew_from_kt
> "\n".join(subp.stderr.readlines(
> TypeError: sequence item 0: expected str instance, bytes found
> {quote}
> The issue here (ignoring for a moment why {{kinit}} is failing on my machine) 
> is that Popen in Python3 returns {{bytes}} for stdin/stdout, but both are 
> handled as if they are {{str}}.
> I'm unsure what the Py2/3 compat policy is at Airflow, but a simple {{from 
> six import PY2}} and an if/else seems like the least intrusive fix. The 
> non-PY2 path would then add something like 
> {{subp.stdin.readlines().decode(errors='ignore')}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (AIRFLOW-840) Python3 encoding issue in Kerberos

2017-02-06 Thread Erik Cederstrand (JIRA)
Erik Cederstrand created AIRFLOW-840:


 Summary: Python3 encoding issue in Kerberos
 Key: AIRFLOW-840
 URL: https://issues.apache.org/jira/browse/AIRFLOW-840
 Project: Apache Airflow
  Issue Type: Bug
  Components: security
Affects Versions: Airflow 1.8
 Environment: $ python --version
Python 3.4.3
Reporter: Erik Cederstrand


While attempting to configure Kerberos ticket renewal in a Python3 environment, 
I encountered this encoding issue trying to run {{airflow kerberos}}:

{{
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 15, in 
args.func(args)
  File "/usr/local/lib/python3.4/dist-packages/airflow/bin/cli.py", line 600, 
in kerberos
airflow.security.kerberos.run()
  File "/usr/local/lib/python3.4/dist-packages/airflow/security/kerberos.py", 
line 110, in run
renew_from_kt()
  File "/usr/local/lib/python3.4/dist-packages/airflow/security/kerberos.py", 
line 55, in renew_from_kt
"\n".join(subp.stderr.readlines(
TypeError: sequence item 0: expected str instance, bytes found
}}

The issue here (ignoring for a moment why {{kinit}} is failing on my machine) 
is that Popen in Python3 returns {{bytes}} for stdin/stdout, but both are 
handled as if they are {{str}}.

I'm unsure what the Py2/3 compat policy is at Airflow, but a simple {{from six 
import PY2}} and an if/else seems like the least intrusive fix. The non-PY2 
path would then add something like 
{{subp.stdin.readlines().decode(errors='ignore')}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)