[jira] [Commented] (AIRFLOW-214) TaskInstance can get detached in process_dag

2016-06-06 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15317979#comment-15317979
 ] 

ASF subversion and git services commented on AIRFLOW-214:
-

Commit 1e48c2b914375feaf7b8a3204cc29364a2c0cd02 in incubator-airflow's branch 
refs/heads/master from [~bolke]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=1e48c2b ]

[AIRFLOW-214] Fix occasion of detached taskinstance

For some reason occasionely taskinstanced could become
detached from the database session. Now it uses a fresh session
to ensure the taskinstances stay attached.


> TaskInstance can get detached in process_dag
> 
>
> Key: AIRFLOW-214
> URL: https://issues.apache.org/jira/browse/AIRFLOW-214
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
>Reporter: Bolke de Bruin
> Fix For: Airflow 1.8
>
>
> Is some rare occasions the TaskInstance can get detached in process_dag. It 
> is unclear why
> {code}[2016-06-06 17:47:37,048] {models.py:3444} INFO - Updating state for 
>  scheduled__2016-06-05T00:00:00, externally triggered: False> considering 8 
> task(s)
> [2016-06-06 17:47:37,059] {jobs.py:670} ERROR - Instance  0x10d3e9410> is not bound to a Session; attribute refresh operation cannot 
> proceed
> Traceback (most recent call last):
>   File 
> "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.1.2-py2.7.egg/airflow/jobs.py",
>  line 667, in _do_dags
> self.process_dag(dag, tis_out)
>   File 
> "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.1.2-py2.7.egg/airflow/jobs.py",
>  line 531, in process_dag
> task = dag.get_task(ti.task_id)
>   File 
> "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/sqlalchemy/orm/attributes.py",
>  line 237, in __get__
> return self.impl.get(instance_state(instance), dict_)
>   File 
> "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/sqlalchemy/orm/attributes.py",
>  line 578, in get
> value = state._load_expired(state, passive)
>   File 
> "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/sqlalchemy/orm/state.py",
>  line 474, in _load_expired
> self.manager.deferred_scalar_loader(self, toload)
>   File 
> "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/sqlalchemy/orm/loading.py",
>  line 610, in load_scalar_attributes
> (state_str(state)))
> DetachedInstanceError: Instance  is not bound to 
> a Session; attribute refresh operation cannot proceed{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[1/2] incubator-airflow git commit: [AIRFLOW-214] Fix occasion of detached taskinstance

2016-06-06 Thread bolke
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 18009d033 -> 03ce4b9c0


[AIRFLOW-214] Fix occasion of detached taskinstance

For some reason occasionely taskinstanced could become
detached from the database session. Now it uses a fresh session
to ensure the taskinstances stay attached.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1e48c2b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1e48c2b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1e48c2b9

Branch: refs/heads/master
Commit: 1e48c2b914375feaf7b8a3204cc29364a2c0cd02
Parents: 89edb6f
Author: Bolke de Bruin 
Authored: Mon Jun 6 17:52:58 2016 +0200
Committer: Bolke de Bruin 
Committed: Mon Jun 6 21:30:57 2016 +0200

--
 airflow/jobs.py | 11 ++-
 1 file changed, 6 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1e48c2b9/airflow/jobs.py
--
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 005871f..5aaab3b 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -503,7 +503,7 @@ class SchedulerJob(BaseJob):
 session.commit()
 
 # update the state of the previously active dag runs
-dag_runs = DagRun.find(dag_id=dag.dag_id, state=State.RUNNING)
+dag_runs = DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, 
session=session)
 active_dag_runs = []
 for run in dag_runs:
 # do not consider runs that are executed in the future
@@ -513,14 +513,15 @@ class SchedulerJob(BaseJob):
 # todo: run.task is transient but needs to be set
 run.dag = dag
 # todo: preferably the integrity check happens at dag collection 
time
-run.verify_integrity()
-run.update_state()
+run.verify_integrity(session=session)
+run.update_state(session=session)
 if run.state == State.RUNNING:
 active_dag_runs.append(run)
 
 for run in active_dag_runs:
-tis = run.get_task_instances(session=session, state=(State.NONE,
- 
State.UP_FOR_RETRY))
+# this needs a fresh session sometimes tis get detached
+tis = run.get_task_instances(state=(State.NONE,
+State.UP_FOR_RETRY))
 
 # this loop is quite slow as it uses are_dependencies_met for
 # every task (in ti.is_runnable). This is also called in



[2/2] incubator-airflow git commit: Merge remote-tracking branch 'apache/master'

2016-06-06 Thread bolke
Merge remote-tracking branch 'apache/master'


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/03ce4b9c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/03ce4b9c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/03ce4b9c

Branch: refs/heads/master
Commit: 03ce4b9c0992d8d17c850883243bb1cb963521b4
Parents: 1e48c2b 18009d0
Author: Bolke de Bruin 
Authored: Tue Jun 7 08:44:22 2016 +0200
Committer: Bolke de Bruin 
Committed: Tue Jun 7 08:44:22 2016 +0200

--
 airflow/configuration.py |   6 +-
 airflow/models.py|   3 +-
 airflow/www/app.py   |   2 +-
 airflow/www/views.py |   6 +-
 dev/README.md|   2 +-
 dev/airflow-pr   | 173 +++---
 tests/models.py  |  61 +++
 7 files changed, 165 insertions(+), 88 deletions(-)
--




[1/2] incubator-airflow-site git commit: Removing copyright notice

2016-06-06 Thread maximebeauchemin
Repository: incubator-airflow-site
Updated Branches:
  refs/heads/asf-site 9e19165ca -> 1bb548101


http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/1bb54810/searchindex.js
--
diff --git a/searchindex.js b/searchindex.js
index 434ea1e..93399ef 100644
--- a/searchindex.js
+++ b/searchindex.js
@@ -1 +1 @@
-Search.setIndex({envversion:47,filenames:["cli","code","concepts","configuration","faq","index","installation","license","plugins","profiling","project","scheduler","security","start","tutorial","ui"],objects:{"airflow.contrib":{executors:[1,0,0,"-"],hooks:[1,0,0,"-"],operators:[1,0,0,"-"]},"airflow.contrib.hooks":{CloudantHook:[1,1,1,""],FTPHook:[1,1,1,""],SSHHook:[1,1,1,""],VerticaHook:[1,1,1,""]},"airflow.contrib.hooks.CloudantHook":{db:[1,2,1,""]},"airflow.contrib.hooks.FTPHook":{close_conn:[1,2,1,""],create_directory:[1,2,1,""],delete_directory:[1,2,1,""],delete_file:[1,2,1,""],describe_directory:[1,2,1,""],get_conn:[1,2,1,""],list_directory:[1,2,1,""],retrieve_file:[1,2,1,""],store_file:[1,2,1,""]},"airflow.contrib.hooks.SSHHook":{Popen:[1,2,1,""],check_output:[1,2,1,""],tunnel:[1,2,1,""]},"airflow.contrib.hooks.VerticaHook":{get_conn:[1,2,1,""]},"airflow.contrib.operators":{SSHExecuteOperator:[1,1,1,""],VerticaOperator:[1,1,1,""],VerticaToHiveTransfer:[1,1,1,""]},"airflow.con
 
trib.operators.hipchat_operator":{HipChatAPIOperator:[1,1,1,""],HipChatAPISendRoomNotificationOperator:[1,1,1,""]},"airflow.executors":{CeleryExecutor:[1,1,1,""],LocalExecutor:[1,1,1,""],SequentialExecutor:[1,1,1,""]},"airflow.hooks":{DbApiHook:[1,1,1,""],DruidHook:[1,1,1,""],HiveCliHook:[1,1,1,""],HiveMetastoreHook:[1,1,1,""],HiveServer2Hook:[1,1,1,""],HttpHook:[1,1,1,""],MsSqlHook:[1,1,1,""],MySqlHook:[1,1,1,""],PostgresHook:[1,1,1,""],PrestoHook:[1,1,1,""],S3Hook:[1,1,1,""],SqliteHook:[1,1,1,""],WebHDFSHook:[1,1,1,""]},"airflow.hooks.DbApiHook":{bulk_dump:[1,2,1,""],bulk_load:[1,2,1,""],get_conn:[1,2,1,""],get_cursor:[1,2,1,""],get_first:[1,2,1,""],get_pandas_df:[1,2,1,""],get_records:[1,2,1,""],insert_rows:[1,2,1,""],run:[1,2,1,""]},"airflow.hooks.DruidHook":{construct_ingest_query:[1,2,1,""],get_conn:[1,2,1,""],load_from_hdfs:[1,2,1,""]},"airflow.hooks.HiveCliHook":{load_file:[1,2,1,""],run_cli:[1,2,1,""],test_hql:[1,2,1,""]},"airflow.hooks.HiveMetastoreHook":{check_for_partiti
 
on:[1,2,1,""],get_databases:[1,2,1,""],get_metastore_client:[1,2,1,""],get_partitions:[1,2,1,""],get_table:[1,2,1,""],get_tables:[1,2,1,""],max_partition:[1,2,1,""],table_exists:[1,2,1,""]},"airflow.hooks.HiveServer2Hook":{get_pandas_df:[1,2,1,""],get_records:[1,2,1,""]},"airflow.hooks.HttpHook":{get_conn:[1,2,1,""],run:[1,2,1,""],run_and_check:[1,2,1,""]},"airflow.hooks.MsSqlHook":{get_conn:[1,2,1,""]},"airflow.hooks.MySqlHook":{bulk_load:[1,2,1,""],get_conn:[1,2,1,""]},"airflow.hooks.PrestoHook":{get_conn:[1,2,1,""],get_first:[1,2,1,""],get_pandas_df:[1,2,1,""],get_records:[1,2,1,""],run:[1,2,1,""]},"airflow.hooks.S3Hook":{check_for_bucket:[1,2,1,""],check_for_key:[1,2,1,""],check_for_prefix:[1,2,1,""],check_for_wildcard_key:[1,2,1,""],get_bucket:[1,2,1,""],get_conn:[1,2,1,""],get_key:[1,2,1,""],get_wildcard_key:[1,2,1,""],list_keys:[1,2,1,""],list_prefixes:[1,2,1,""],load_file:[1,2,1,""],load_string:[1,2,1,""]},"airflow.hooks.SqliteHook":{get_conn:[1,2,1,""]},"airflow.hooks.WebHD
 
FSHook":{check_for_path:[1,2,1,""],get_conn:[1,2,1,""],load_file:[1,2,1,""]},"airflow.macros":{ds_add:[1,3,1,""],ds_format:[1,3,1,""],hive:[1,0,0,"-"],integrate_plugins:[1,3,1,""],random:[1,3,1,""]},"airflow.macros.hive":{closest_ds_partition:[1,3,1,""],max_partition:[1,3,1,""]},"airflow.models":{BaseOperator:[1,1,1,""],Connection:[1,1,1,""],DAG:[1,1,1,""],DagBag:[1,1,1,""],TaskInstance:[1,1,1,""]},"airflow.models.BaseOperator":{clear:[1,2,1,""],dag:[1,4,1,""],detect_downstream_cycle:[1,2,1,""],downstream_list:[1,4,1,""],execute:[1,2,1,""],get_direct_relatives:[1,2,1,""],get_flat_relatives:[1,2,1,""],get_task_instances:[1,2,1,""],has_dag:[1,2,1,""],on_kill:[1,2,1,""],post_execute:[1,2,1,""],pre_execute:[1,2,1,""],prepare_template:[1,2,1,""],render_template:[1,2,1,""],render_template_from_field:[1,2,1,""],run:[1,2,1,""],schedule_interval:[1,4,1,""],set_downstream:[1,2,1,""],set_upstream:[1,2,1,""],upstream_list:[1,4,1,""],xcom_pull:[1,2,1,""],xcom_push:[1,2,1,""]},"airflow.models.Con
 
nection":{extra_dejson:[1,4,1,""]},"airflow.models.DAG":{add_task:[1,2,1,""],add_tasks:[1,2,1,""],cli:[1,2,1,""],concurrency_reached:[1,4,1,""],crawl_for_tasks:[1,2,1,""],create_dagrun:[1,2,1,""],filepath:[1,4,1,""],folder:[1,4,1,""],get_template_env:[1,2,1,""],is_paused:[1,4,1,""],latest_execution_date:[1,4,1,""],run:[1,2,1,""],set_dependency:[1,2,1,""],sub_dag:[1,2,1,""],subdags:[1,4,1,""],tree_view:[1,2,1,""]},"airflow.models.DagBag":{bag_dag:[1,2,1,""],collect_dags:[1,2,1,""],dagbag_report:[1,2,1,""],get_dag:[1,2,1,""],kill_zombies:[1,2,1,""],process_file:[1,2,1,""],size:[1,2,1,""]},"airflow.mode

[2/2] incubator-airflow-site git commit: Removing copyright notice

2016-06-06 Thread maximebeauchemin
Removing copyright notice


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/commit/1bb54810
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/tree/1bb54810
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/diff/1bb54810

Branch: refs/heads/asf-site
Commit: 1bb548101c6475d89972f057fea572d2938cbaef
Parents: 9e19165
Author: Maxime Beauchemin 
Authored: Mon Jun 6 18:15:38 2016 -0700
Committer: Maxime Beauchemin 
Committed: Mon Jun 6 18:15:38 2016 -0700

--
 _modules/S3_hook.html| 1 -
 _modules/airflow/contrib/operators/hipchat_operator.html | 1 -
 _modules/airflow/executors/celery_executor.html  | 1 -
 _modules/airflow/executors/local_executor.html   | 1 -
 _modules/airflow/executors/sequential_executor.html  | 1 -
 _modules/airflow/macros.html | 1 -
 _modules/airflow/macros/hive.html| 1 -
 _modules/airflow/models.html | 1 -
 _modules/airflow/operators/docker_operator.html  | 1 -
 _modules/airflow/operators/sensors.html  | 1 -
 _modules/bash_operator.html  | 1 -
 _modules/cloudant_hook.html  | 1 -
 _modules/dagrun_operator.html| 1 -
 _modules/dbapi_hook.html | 1 -
 _modules/druid_hook.html | 1 -
 _modules/dummy_operator.html | 1 -
 _modules/email_operator.html | 1 -
 _modules/ftp_hook.html   | 1 -
 _modules/gcs_hook.html   | 1 -
 _modules/generic_transfer.html   | 1 -
 _modules/hive_hooks.html | 1 -
 _modules/hive_operator.html  | 1 -
 _modules/hive_to_druid.html  | 1 -
 _modules/hive_to_mysql.html  | 1 -
 _modules/hive_to_samba_operator.html | 1 -
 _modules/http_hook.html  | 1 -
 _modules/http_operator.html  | 1 -
 _modules/index.html  | 1 -
 _modules/mssql_hook.html | 1 -
 _modules/mssql_operator.html | 1 -
 _modules/mssql_to_hive.html  | 1 -
 _modules/mysql_hook.html | 1 -
 _modules/mysql_operator.html | 1 -
 _modules/mysql_to_hive.html  | 1 -
 _modules/postgres_hook.html  | 1 -
 _modules/postgres_operator.html  | 1 -
 _modules/presto_check_operator.html  | 1 -
 _modules/presto_hook.html| 1 -
 _modules/python_operator.html| 1 -
 _modules/s3_to_hive_operator.html| 1 -
 _modules/sensors.html| 1 -
 _modules/slack_operator.html | 1 -
 _modules/sqlite_hook.html| 1 -
 _modules/ssh_execute_operator.html   | 1 -
 _modules/ssh_hook.html   | 1 -
 _modules/vertica_hook.html   | 1 -
 _modules/vertica_operator.html   | 1 -
 _modules/vertica_to_hive.html| 1 -
 _modules/webhdfs_hook.html   | 1 -
 _sources/project.txt | 1 +
 cli.html | 1 -
 code.html| 1 -
 concepts.html| 1 -
 configuration.html   | 1 -
 faq.html | 1 -
 genindex.html| 1 -
 index.html   | 1 -
 installation.html| 1 -
 license.html | 1 -
 plugins.html | 1 -
 profiling.html   | 1 -
 project.html | 2 +-
 py-modindex.html | 1 -
 scheduler.html   | 1 -
 search.html  | 1 -
 searchindex.js   | 2 +-
 security.html| 1 -
 start.html

[jira] [Commented] (AIRFLOW-131) XCom data is cleared even if a task already succeeded

2016-06-06 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15317330#comment-15317330
 ] 

ASF subversion and git services commented on AIRFLOW-131:
-

Commit 18009d03311a0b29e14811865e0b13b19427b5e4 in incubator-airflow's branch 
refs/heads/master from [~john.nason]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=18009d0 ]

[AIRFLOW-131] Make XCom.clear more selective

XCOMs for a task were getting cleared on every run, no matter what.
Selectively clear only when the task is actually going to be run.

Closes #1570 from johnnason/AIRFLOW-131


> XCom data is cleared even if a task already succeeded
> -
>
> Key: AIRFLOW-131
> URL: https://issues.apache.org/jira/browse/AIRFLOW-131
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: xcom
>Affects Versions: Airflow 1.7.1
>Reporter: Jeremiah Lowin
>Assignee: Jeremiah Lowin
>
> In {{TaskInstance.run()}}, the XCom data is immediately cleared. It should 
> not be cleared until we determine that the task is actually going to run. For 
> example, if the task already succeeded it won't run and we want the XCom data 
> to remain.
> Reported here: 
> https://github.com/apache/incubator-airflow/issues/999#issuecomment-220047510



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (AIRFLOW-131) XCom data is cleared even if a task already succeeded

2016-06-06 Thread Jeremiah Lowin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeremiah Lowin resolved AIRFLOW-131.

Resolution: Fixed

Issue resolved by pull request #1570
[https://github.com/apache/incubator-airflow/pull/1570]

> XCom data is cleared even if a task already succeeded
> -
>
> Key: AIRFLOW-131
> URL: https://issues.apache.org/jira/browse/AIRFLOW-131
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: xcom
>Affects Versions: Airflow 1.7.1
>Reporter: Jeremiah Lowin
>Assignee: Jeremiah Lowin
>
> In {{TaskInstance.run()}}, the XCom data is immediately cleared. It should 
> not be cleared until we determine that the task is actually going to run. For 
> example, if the task already succeeded it won't run and we want the XCom data 
> to remain.
> Reported here: 
> https://github.com/apache/incubator-airflow/issues/999#issuecomment-220047510



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-131) XCom data is cleared even if a task already succeeded

2016-06-06 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15317329#comment-15317329
 ] 

ASF subversion and git services commented on AIRFLOW-131:
-

Commit 18009d03311a0b29e14811865e0b13b19427b5e4 in incubator-airflow's branch 
refs/heads/master from [~john.nason]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=18009d0 ]

[AIRFLOW-131] Make XCom.clear more selective

XCOMs for a task were getting cleared on every run, no matter what.
Selectively clear only when the task is actually going to be run.

Closes #1570 from johnnason/AIRFLOW-131


> XCom data is cleared even if a task already succeeded
> -
>
> Key: AIRFLOW-131
> URL: https://issues.apache.org/jira/browse/AIRFLOW-131
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: xcom
>Affects Versions: Airflow 1.7.1
>Reporter: Jeremiah Lowin
>Assignee: Jeremiah Lowin
>
> In {{TaskInstance.run()}}, the XCom data is immediately cleared. It should 
> not be cleared until we determine that the task is actually going to run. For 
> example, if the task already succeeded it won't run and we want the XCom data 
> to remain.
> Reported here: 
> https://github.com/apache/incubator-airflow/issues/999#issuecomment-220047510



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


incubator-airflow git commit: [AIRFLOW-131] Make XCom.clear more selective

2016-06-06 Thread jlowin
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 275aa15fa -> 18009d033


[AIRFLOW-131] Make XCom.clear more selective

XCOMs for a task were getting cleared on every run, no matter what.
Selectively clear only when the task is actually going to be run.

Closes #1570 from johnnason/AIRFLOW-131


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/18009d03
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/18009d03
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/18009d03

Branch: refs/heads/master
Commit: 18009d03311a0b29e14811865e0b13b19427b5e4
Parents: 275aa15
Author: John Nason 
Authored: Mon Jun 6 17:50:06 2016 -0400
Committer: jlowin 
Committed: Mon Jun 6 17:50:10 2016 -0400

--
 airflow/models.py |  3 ++-
 tests/models.py   | 61 ++
 2 files changed, 63 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/18009d03/airflow/models.py
--
diff --git a/airflow/models.py b/airflow/models.py
index fa3f6ca..08d0890 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1186,7 +1186,6 @@ class TaskInstance(Base):
 self.test_mode = test_mode
 self.force = force
 self.refresh_from_db(session=session, lock_for_update=True)
-self.clear_xcom_data()
 self.job_id = job_id
 iso = datetime.now().isoformat()
 self.hostname = socket.getfqdn()
@@ -1195,6 +1194,7 @@ class TaskInstance(Base):
 if self.state == State.RUNNING:
 logging.warning("Another instance is running, skipping.")
 elif self.state == State.REMOVED:
+self.clear_xcom_data()
 logging.debug("Task {} was removed from the dag".format(self))
 elif not force and self.state == State.SUCCESS:
 logging.info(
@@ -1219,6 +1219,7 @@ class TaskInstance(Base):
 "Next run after {0}".format(next_run)
 )
 elif force or self.state in State.runnable():
+self.clear_xcom_data()
 HR = "\n" + ("-" * 80) + "\n"  # Line break
 
 # For reporting purposes, we report based on 1-indexed,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/18009d03/tests/models.py
--
diff --git a/tests/models.py b/tests/models.py
index bc3f1e1..2aae476 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -507,3 +507,64 @@ class TaskInstanceTest(unittest.TestCase):
 
 self.assertEqual(completed, expect_completed)
 self.assertEqual(ti.state, expect_state)
+
+def test_xcom_pull_after_success(self):
+"""
+tests xcom set/clear relative to a task in a 'success' rerun scenario
+"""
+key = 'xcom_key'
+value = 'xcom_value'
+
+dag = models.DAG(dag_id='test_xcom', schedule_interval='@monthly')
+task = DummyOperator(
+task_id='test_xcom',
+dag=dag,
+pool='test_xcom',
+owner='airflow',
+start_date=datetime.datetime(2016, 6, 2, 0, 0, 0))
+exec_date = datetime.datetime.now()
+ti = TI(
+task=task, execution_date=exec_date)
+ti.run(mark_success=True)
+ti.xcom_push(key=key, value=value)
+self.assertEqual(ti.xcom_pull(task_ids='test_xcom', key=key), value)
+ti.run()
+# The second run and assert is to handle AIRFLOW-131 (don't clear on
+# prior success)
+self.assertEqual(ti.xcom_pull(task_ids='test_xcom', key=key), value)
+
+def test_xcom_pull_different_execution_date(self):
+"""
+tests xcom fetch behavior with different execution dates, using
+both xcom_pull with "include_prior_dates" and without
+"""
+key = 'xcom_key'
+value = 'xcom_value'
+
+dag = models.DAG(dag_id='test_xcom', schedule_interval='@monthly')
+task = DummyOperator(
+task_id='test_xcom',
+dag=dag,
+pool='test_xcom',
+owner='airflow',
+start_date=datetime.datetime(2016, 6, 2, 0, 0, 0))
+exec_date = datetime.datetime.now()
+ti = TI(
+task=task, execution_date=exec_date)
+ti.run(mark_success=True)
+ti.xcom_push(key=key, value=value)
+self.assertEqual(ti.xcom_pull(task_ids='test_xcom', key=key), value)
+ti.run()
+exec_date = exec_date.replace(day=exec_date.day + 1)
+ti = TI(
+task=task, execution_date=exec_date)
+ti.run()
+# We have set a new execution date (and did not pass in
+# 'include_prior_da

[jira] [Commented] (AIRFLOW-184) Add clear/mark success to CLI

2016-06-06 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15317223#comment-15317223
 ] 

Chris Riccomini commented on AIRFLOW-184:
-

{quote}
Ideally, this should need to be queued indeed
{quote}

Agreed, but would rather first get the CLI and UI in sync. Once they are the 
same, we can think about how we make things more async/scalable.

> Add clear/mark success to CLI
> -
>
> Key: AIRFLOW-184
> URL: https://issues.apache.org/jira/browse/AIRFLOW-184
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: cli
>Reporter: Chris Riccomini
>Assignee: Joy Gao
>
> AIRFLOW-177 pointed out that the current CLI does not allow us to clear or 
> mark success a task (including upstream, downstream, past, future, and 
> recursive) the way that the UI widget does. Given a goal of keeping parity 
> between the UI and CLI, it seems like we should support this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-184) Add clear/mark success to CLI

2016-06-06 Thread Arthur Wiedmer (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15317196#comment-15317196
 ] 

Arthur Wiedmer commented on AIRFLOW-184:


Sounds good to me.

Ideally, this should need to be queued indeed. Should the commands mark_success 
just be a wrapper around a more general set_state?

Marking large swath of tasks as success is a pain in the ui, and the backfill 
with regex matching was useful for this. But I agree that it does not make 
sense anymore and should be refactored into something more useful + that does 
not go through the scheduler as it is a waste of slots.

> Add clear/mark success to CLI
> -
>
> Key: AIRFLOW-184
> URL: https://issues.apache.org/jira/browse/AIRFLOW-184
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: cli
>Reporter: Chris Riccomini
>Assignee: Joy Gao
>
> AIRFLOW-177 pointed out that the current CLI does not allow us to clear or 
> mark success a task (including upstream, downstream, past, future, and 
> recursive) the way that the UI widget does. Given a goal of keeping parity 
> between the UI and CLI, it seems like we should support this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-139) Executing VACUUM with PostgresOperator

2016-06-06 Thread Matt Land (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15317192#comment-15317192
 ] 

Matt Land commented on AIRFLOW-139:
---

This is for our redshift connection. Based on their docs, "Amazon Redshift is 
based on PostgreSQL 8.0.2"

http://docs.aws.amazon.com/redshift/latest/dg/c_redshift-and-postgres-sql.html

> Executing VACUUM with PostgresOperator
> --
>
> Key: AIRFLOW-139
> URL: https://issues.apache.org/jira/browse/AIRFLOW-139
> Project: Apache Airflow
>  Issue Type: Bug
>Affects Versions: Airflow 1.7.0
>Reporter: Rafael
>
> Dear Airflow Maintainers,
> h1. Environment
> * Airflow version: *v1.7.0*
> * Airflow components: *PostgresOperator*
> * Python Version: *Python 3.5.1*
> * Operating System: *15.4.0 Darwin*
> h1. Description of Issue
> I am trying to execute a `VACUUM` command as part of DAG with the 
> `PostgresOperator`, which fails with the following error:
> {quote}
> [2016-05-14 16:14:01,849] {__init__.py:36} INFO - Using executor 
> SequentialExecutor
> Traceback (most recent call last):
>   File "/usr/local/bin/airflow", line 15, in 
> args.func(args)
>   File 
> "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/airflow/bin/cli.py",
>  line 203, in run
> pool=args.pool,
>   File 
> "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/airflow/models.py",
>  line 1067, in run
> result = task_copy.execute(context=context)
>   File 
> "/usr/local/lib/python3.5/site-packages/airflow/operators/postgres_operator.py",
>  line 39, in execute
> self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
>   File 
> "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/airflow/hooks/dbapi_hook.py",
>  line 109, in run
> cur.execute(s)
> psycopg2.InternalError: VACUUM cannot run inside a transaction block
> {quote}
> I could create a small python script that performs the operation, as 
> explained in [this stackoverflow 
> entry](http://stackoverflow.com/questions/1017463/postgresql-how-to-run-vacuum-from-code-outside-transaction-block).
>  However, I would like to know first if the `VACUUM` command should be 
> supported by the `PostgresOperator`.
> h1. Reproducing the Issue
> The operator can be declared as follows:
> {quote}
> conn = ('postgres_default')
> t4 = PostgresOperator(
> task_id='vacuum',
> postgres_conn_id=conn,
> sql=("VACUUM public.table"),
> dag=dag
> )
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-139) Executing VACUUM with PostgresOperator

2016-06-06 Thread Matt Land (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15317188#comment-15317188
 ] 

Matt Land commented on AIRFLOW-139:
---

This looks like it was changed in 
https://github.com/apache/incubator-airflow/commit/28da05d860147b5e0df37d998f437af6a5d4d178#diff-52dbcc045791a2a6baea120e05f573b5

When I manually set line 14 back to "supports_autocommit = True", the dag is 
able to vacuum when connected to my redshift cluster.



> Executing VACUUM with PostgresOperator
> --
>
> Key: AIRFLOW-139
> URL: https://issues.apache.org/jira/browse/AIRFLOW-139
> Project: Apache Airflow
>  Issue Type: Bug
>Affects Versions: Airflow 1.7.0
>Reporter: Rafael
>
> Dear Airflow Maintainers,
> h1. Environment
> * Airflow version: *v1.7.0*
> * Airflow components: *PostgresOperator*
> * Python Version: *Python 3.5.1*
> * Operating System: *15.4.0 Darwin*
> h1. Description of Issue
> I am trying to execute a `VACUUM` command as part of DAG with the 
> `PostgresOperator`, which fails with the following error:
> {quote}
> [2016-05-14 16:14:01,849] {__init__.py:36} INFO - Using executor 
> SequentialExecutor
> Traceback (most recent call last):
>   File "/usr/local/bin/airflow", line 15, in 
> args.func(args)
>   File 
> "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/airflow/bin/cli.py",
>  line 203, in run
> pool=args.pool,
>   File 
> "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/airflow/models.py",
>  line 1067, in run
> result = task_copy.execute(context=context)
>   File 
> "/usr/local/lib/python3.5/site-packages/airflow/operators/postgres_operator.py",
>  line 39, in execute
> self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
>   File 
> "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/airflow/hooks/dbapi_hook.py",
>  line 109, in run
> cur.execute(s)
> psycopg2.InternalError: VACUUM cannot run inside a transaction block
> {quote}
> I could create a small python script that performs the operation, as 
> explained in [this stackoverflow 
> entry](http://stackoverflow.com/questions/1017463/postgresql-how-to-run-vacuum-from-code-outside-transaction-block).
>  However, I would like to know first if the `VACUUM` command should be 
> supported by the `PostgresOperator`.
> h1. Reproducing the Issue
> The operator can be declared as follows:
> {quote}
> conn = ('postgres_default')
> t4 = PostgresOperator(
> task_id='vacuum',
> postgres_conn_id=conn,
> sql=("VACUUM public.table"),
> dag=dag
> )
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (AIRFLOW-184) Add clear/mark success to CLI

2016-06-06 Thread Dan Davydov (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15317128#comment-15317128
 ] 

Dan Davydov edited comment on AIRFLOW-184 at 6/6/16 8:39 PM:
-

I agree that mark_success should be moved into a separate command. I don't 
think the command even needs to interact with workers at all like the current 
way backfill + mark_success works, and we could also simplify a lot of the 
code. I think it is ok to break backwards compatibility here without bumping to 
a major release risk-wise but I think 1-2 other committers should +1 this since 
it's a bit of a grey area. Otherwise we can add the new --mark-success command, 
deprecate backfill --mark-success and break compatibility in 2.0.


was (Author: aoen):
I agree that mark_success should be moved into a separate command. I don't 
think the command even needs to interact with workers at all like the current 
way backfill + mark_success works, and we could also simplify a lot of the 
code. I think it is ok to break backwards incompatibility here without bumping 
to a major release risk-wise but I think 1-2 other committers should +1 this 
since it's a bit of a grey area. Otherwise we can add the new --mark-success 
command, deprecate backfill --mark-success and break compatibility in 2.0.

> Add clear/mark success to CLI
> -
>
> Key: AIRFLOW-184
> URL: https://issues.apache.org/jira/browse/AIRFLOW-184
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: cli
>Reporter: Chris Riccomini
>Assignee: Joy Gao
>
> AIRFLOW-177 pointed out that the current CLI does not allow us to clear or 
> mark success a task (including upstream, downstream, past, future, and 
> recursive) the way that the UI widget does. Given a goal of keeping parity 
> between the UI and CLI, it seems like we should support this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-139) Executing VACUUM with PostgresOperator

2016-06-06 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15317185#comment-15317185
 ] 

Chris Riccomini commented on AIRFLOW-139:
-

Random question: what versions of postgres are you running?

> Executing VACUUM with PostgresOperator
> --
>
> Key: AIRFLOW-139
> URL: https://issues.apache.org/jira/browse/AIRFLOW-139
> Project: Apache Airflow
>  Issue Type: Bug
>Affects Versions: Airflow 1.7.0
>Reporter: Rafael
>
> Dear Airflow Maintainers,
> h1. Environment
> * Airflow version: *v1.7.0*
> * Airflow components: *PostgresOperator*
> * Python Version: *Python 3.5.1*
> * Operating System: *15.4.0 Darwin*
> h1. Description of Issue
> I am trying to execute a `VACUUM` command as part of DAG with the 
> `PostgresOperator`, which fails with the following error:
> {quote}
> [2016-05-14 16:14:01,849] {__init__.py:36} INFO - Using executor 
> SequentialExecutor
> Traceback (most recent call last):
>   File "/usr/local/bin/airflow", line 15, in 
> args.func(args)
>   File 
> "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/airflow/bin/cli.py",
>  line 203, in run
> pool=args.pool,
>   File 
> "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/airflow/models.py",
>  line 1067, in run
> result = task_copy.execute(context=context)
>   File 
> "/usr/local/lib/python3.5/site-packages/airflow/operators/postgres_operator.py",
>  line 39, in execute
> self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
>   File 
> "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/airflow/hooks/dbapi_hook.py",
>  line 109, in run
> cur.execute(s)
> psycopg2.InternalError: VACUUM cannot run inside a transaction block
> {quote}
> I could create a small python script that performs the operation, as 
> explained in [this stackoverflow 
> entry](http://stackoverflow.com/questions/1017463/postgresql-how-to-run-vacuum-from-code-outside-transaction-block).
>  However, I would like to know first if the `VACUUM` command should be 
> supported by the `PostgresOperator`.
> h1. Reproducing the Issue
> The operator can be declared as follows:
> {quote}
> conn = ('postgres_default')
> t4 = PostgresOperator(
> task_id='vacuum',
> postgres_conn_id=conn,
> sql=("VACUUM public.table"),
> dag=dag
> )
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-184) Add clear/mark success to CLI

2016-06-06 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15317184#comment-15317184
 ] 

Chris Riccomini commented on AIRFLOW-184:
-

{quote}
I don't think the command even needs to interact with workers at all like the 
current way backfill + mark_success works
{quote}

+1

{quote}
I think it is ok to break backwards incompatibility here without bumping to a 
major release risk-wise but I think 1-2 other committers should +1 this since 
it's a bit of a grey area.
{quote}

Sounds good. So far we've got your +1 and mine. Could you do a quick ping from 
[~artwr] or [~maxime.beauche...@apache.org]? And we can get one of [~bolke] or 
[~jlowin] to +1.

> Add clear/mark success to CLI
> -
>
> Key: AIRFLOW-184
> URL: https://issues.apache.org/jira/browse/AIRFLOW-184
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: cli
>Reporter: Chris Riccomini
>Assignee: Joy Gao
>
> AIRFLOW-177 pointed out that the current CLI does not allow us to clear or 
> mark success a task (including upstream, downstream, past, future, and 
> recursive) the way that the UI widget does. Given a goal of keeping parity 
> between the UI and CLI, it seems like we should support this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Reopened] (AIRFLOW-139) Executing VACUUM with PostgresOperator

2016-06-06 Thread Chris Riccomini (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini reopened AIRFLOW-139:
-

> Executing VACUUM with PostgresOperator
> --
>
> Key: AIRFLOW-139
> URL: https://issues.apache.org/jira/browse/AIRFLOW-139
> Project: Apache Airflow
>  Issue Type: Bug
>Affects Versions: Airflow 1.7.0
>Reporter: Rafael
>
> Dear Airflow Maintainers,
> h1. Environment
> * Airflow version: *v1.7.0*
> * Airflow components: *PostgresOperator*
> * Python Version: *Python 3.5.1*
> * Operating System: *15.4.0 Darwin*
> h1. Description of Issue
> I am trying to execute a `VACUUM` command as part of DAG with the 
> `PostgresOperator`, which fails with the following error:
> {quote}
> [2016-05-14 16:14:01,849] {__init__.py:36} INFO - Using executor 
> SequentialExecutor
> Traceback (most recent call last):
>   File "/usr/local/bin/airflow", line 15, in 
> args.func(args)
>   File 
> "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/airflow/bin/cli.py",
>  line 203, in run
> pool=args.pool,
>   File 
> "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/airflow/models.py",
>  line 1067, in run
> result = task_copy.execute(context=context)
>   File 
> "/usr/local/lib/python3.5/site-packages/airflow/operators/postgres_operator.py",
>  line 39, in execute
> self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
>   File 
> "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/airflow/hooks/dbapi_hook.py",
>  line 109, in run
> cur.execute(s)
> psycopg2.InternalError: VACUUM cannot run inside a transaction block
> {quote}
> I could create a small python script that performs the operation, as 
> explained in [this stackoverflow 
> entry](http://stackoverflow.com/questions/1017463/postgresql-how-to-run-vacuum-from-code-outside-transaction-block).
>  However, I would like to know first if the `VACUUM` command should be 
> supported by the `PostgresOperator`.
> h1. Reproducing the Issue
> The operator can be declared as follows:
> {quote}
> conn = ('postgres_default')
> t4 = PostgresOperator(
> task_id='vacuum',
> postgres_conn_id=conn,
> sql=("VACUUM public.table"),
> dag=dag
> )
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-139) Executing VACUUM with PostgresOperator

2016-06-06 Thread Matt Land (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15317165#comment-15317165
 ] 

Matt Land commented on AIRFLOW-139:
---

Confirming this issue, we are getting the same failure under v1.7.1.2 with 
"autocommit=True" on the task.

> Executing VACUUM with PostgresOperator
> --
>
> Key: AIRFLOW-139
> URL: https://issues.apache.org/jira/browse/AIRFLOW-139
> Project: Apache Airflow
>  Issue Type: Bug
>Affects Versions: Airflow 1.7.0
>Reporter: Rafael
>
> Dear Airflow Maintainers,
> h1. Environment
> * Airflow version: *v1.7.0*
> * Airflow components: *PostgresOperator*
> * Python Version: *Python 3.5.1*
> * Operating System: *15.4.0 Darwin*
> h1. Description of Issue
> I am trying to execute a `VACUUM` command as part of DAG with the 
> `PostgresOperator`, which fails with the following error:
> {quote}
> [2016-05-14 16:14:01,849] {__init__.py:36} INFO - Using executor 
> SequentialExecutor
> Traceback (most recent call last):
>   File "/usr/local/bin/airflow", line 15, in 
> args.func(args)
>   File 
> "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/airflow/bin/cli.py",
>  line 203, in run
> pool=args.pool,
>   File 
> "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/airflow/models.py",
>  line 1067, in run
> result = task_copy.execute(context=context)
>   File 
> "/usr/local/lib/python3.5/site-packages/airflow/operators/postgres_operator.py",
>  line 39, in execute
> self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
>   File 
> "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/airflow/hooks/dbapi_hook.py",
>  line 109, in run
> cur.execute(s)
> psycopg2.InternalError: VACUUM cannot run inside a transaction block
> {quote}
> I could create a small python script that performs the operation, as 
> explained in [this stackoverflow 
> entry](http://stackoverflow.com/questions/1017463/postgresql-how-to-run-vacuum-from-code-outside-transaction-block).
>  However, I would like to know first if the `VACUUM` command should be 
> supported by the `PostgresOperator`.
> h1. Reproducing the Issue
> The operator can be declared as follows:
> {quote}
> conn = ('postgres_default')
> t4 = PostgresOperator(
> task_id='vacuum',
> postgres_conn_id=conn,
> sql=("VACUUM public.table"),
> dag=dag
> )
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-184) Add clear/mark success to CLI

2016-06-06 Thread Dan Davydov (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15317128#comment-15317128
 ] 

Dan Davydov commented on AIRFLOW-184:
-

I agree that mark_success should be moved into a separate command. I don't 
think the command even needs to interact with workers at all like the current 
way backfill + mark_success works, and we could also simplify a lot of the 
code. I think it is ok to break backwards incompatibility here without bumping 
to a major release risk-wise but I think 1-2 other committers should +1 this 
since it's a bit of a grey area. Otherwise we can add the new --mark-success 
command, deprecate backfill --mark-success and break compatibility in 2.0.

> Add clear/mark success to CLI
> -
>
> Key: AIRFLOW-184
> URL: https://issues.apache.org/jira/browse/AIRFLOW-184
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: cli
>Reporter: Chris Riccomini
>Assignee: Joy Gao
>
> AIRFLOW-177 pointed out that the current CLI does not allow us to clear or 
> mark success a task (including upstream, downstream, past, future, and 
> recursive) the way that the UI widget does. Given a goal of keeping parity 
> between the UI and CLI, it seems like we should support this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (AIRFLOW-215) Airflow worker (CeleryExecutor) needs to be restarted to pick up tasks

2016-06-06 Thread Chris Riccomini (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini updated AIRFLOW-215:

Component/s: celery

> Airflow worker (CeleryExecutor) needs to be restarted to pick up tasks
> --
>
> Key: AIRFLOW-215
> URL: https://issues.apache.org/jira/browse/AIRFLOW-215
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery
>Affects Versions: Airflow 1.7.1.2
>Reporter: Cyril Scetbon
>
> We have a main dag that dynamically creates subdags containing tasks using 
> BashOperator. Using CeleryExecutor we see Celery tasks been created with 
> *STARTED* status but they are not picked up by our worker. However, if we 
> restart our worker, then tasks are picked up. 
> Here you can find code if you want to try to reproduce it 
> https://www.dropbox.com/s/8u7xf8jt55v8zio/dags.zip.
> We also tested using LocalExecutor and everything worked fine.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (AIRFLOW-215) Airflow worker (CeleryExecutor) needs to be restarted to pick up tasks

2016-06-06 Thread Cyril Scetbon (JIRA)
Cyril Scetbon created AIRFLOW-215:
-

 Summary: Airflow worker (CeleryExecutor) needs to be restarted to 
pick up tasks
 Key: AIRFLOW-215
 URL: https://issues.apache.org/jira/browse/AIRFLOW-215
 Project: Apache Airflow
  Issue Type: Bug
Affects Versions: Airflow 1.7.1.2
Reporter: Cyril Scetbon


We have a main dag that dynamically creates subdags containing tasks using 
BashOperator. Using CeleryExecutor we see Celery tasks been created with 
*STARTED* status but they are not picked up by our worker. However, if we 
restart our worker, then tasks are picked up. 

Here you can find code if you want to try to reproduce it 
https://www.dropbox.com/s/8u7xf8jt55v8zio/dags.zip.

We also tested using LocalExecutor and everything worked fine.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-214) TaskInstance can get detached in process_dag

2016-06-06 Thread Bolke de Bruin (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15317063#comment-15317063
 ] 

Bolke de Bruin commented on AIRFLOW-214:


The PR works, that is for sure. I do not understand why the task can get 
detached in the first place. Session should be consistent and it happens right 
after dagrun.get_task_instances. It does not happen every time but I could 
replicate it with example_skip_dag, but couldnt fix it by keeping the current 
session.

> TaskInstance can get detached in process_dag
> 
>
> Key: AIRFLOW-214
> URL: https://issues.apache.org/jira/browse/AIRFLOW-214
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
>Reporter: Bolke de Bruin
> Fix For: Airflow 1.8
>
>
> Is some rare occasions the TaskInstance can get detached in process_dag. It 
> is unclear why
> {code}[2016-06-06 17:47:37,048] {models.py:3444} INFO - Updating state for 
>  scheduled__2016-06-05T00:00:00, externally triggered: False> considering 8 
> task(s)
> [2016-06-06 17:47:37,059] {jobs.py:670} ERROR - Instance  0x10d3e9410> is not bound to a Session; attribute refresh operation cannot 
> proceed
> Traceback (most recent call last):
>   File 
> "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.1.2-py2.7.egg/airflow/jobs.py",
>  line 667, in _do_dags
> self.process_dag(dag, tis_out)
>   File 
> "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.1.2-py2.7.egg/airflow/jobs.py",
>  line 531, in process_dag
> task = dag.get_task(ti.task_id)
>   File 
> "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/sqlalchemy/orm/attributes.py",
>  line 237, in __get__
> return self.impl.get(instance_state(instance), dict_)
>   File 
> "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/sqlalchemy/orm/attributes.py",
>  line 578, in get
> value = state._load_expired(state, passive)
>   File 
> "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/sqlalchemy/orm/state.py",
>  line 474, in _load_expired
> self.manager.deferred_scalar_loader(self, toload)
>   File 
> "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/sqlalchemy/orm/loading.py",
>  line 610, in load_scalar_attributes
> (state_str(state)))
> DetachedInstanceError: Instance  is not bound to 
> a Session; attribute refresh operation cannot proceed{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (AIRFLOW-214) TaskInstance can get detached in process_dag

2016-06-06 Thread Bolke de Bruin (JIRA)
Bolke de Bruin created AIRFLOW-214:
--

 Summary: TaskInstance can get detached in process_dag
 Key: AIRFLOW-214
 URL: https://issues.apache.org/jira/browse/AIRFLOW-214
 Project: Apache Airflow
  Issue Type: Bug
  Components: scheduler
Reporter: Bolke de Bruin
 Fix For: Airflow 1.8


Is some rare occasions the TaskInstance can get detached in process_dag. It is 
unclear why

{code}[2016-06-06 17:47:37,048] {models.py:3444} INFO - Updating state for 
 considering 8 task(s)
[2016-06-06 17:47:37,059] {jobs.py:670} ERROR - Instance  is not bound to a Session; attribute refresh operation cannot 
proceed
Traceback (most recent call last):
  File 
"/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.1.2-py2.7.egg/airflow/jobs.py",
 line 667, in _do_dags
self.process_dag(dag, tis_out)
  File 
"/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.1.2-py2.7.egg/airflow/jobs.py",
 line 531, in process_dag
task = dag.get_task(ti.task_id)
  File 
"/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/sqlalchemy/orm/attributes.py",
 line 237, in __get__
return self.impl.get(instance_state(instance), dict_)
  File 
"/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/sqlalchemy/orm/attributes.py",
 line 578, in get
value = state._load_expired(state, passive)
  File 
"/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/sqlalchemy/orm/state.py",
 line 474, in _load_expired
self.manager.deferred_scalar_loader(self, toload)
  File 
"/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/sqlalchemy/orm/loading.py",
 line 610, in load_scalar_attributes
(state_str(state)))
DetachedInstanceError: Instance  is not bound to a 
Session; attribute refresh operation cannot proceed{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (AIRFLOW-9) Create Airflow website

2016-06-06 Thread Maxime Beauchemin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-9?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maxime Beauchemin closed AIRFLOW-9.
---
Resolution: Fixed

Site is up here:
http://airflow.incubator.apache.org/

The static site is at 
`https://git-wip-us.apache.org/repos/asf/incubator-airflow-site.git` no branch 
`asf-site`.

To update, run `python setup.py build_sphinx`, this builds a static site under 
`docs/_build/html/`, which can be copied in the root of the repo/branch 
mentioned above.

Here's the same info in confluence:
https://cwiki.apache.org/confluence/display/AIRFLOW/Building+and+deploying+the+docs

> Create Airflow website
> --
>
> Key: AIRFLOW-9
> URL: https://issues.apache.org/jira/browse/AIRFLOW-9
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: project-management
>Reporter: Chris Riccomini
>
> We should set up an Airflow website for:
> http://airflow.incubator.apache.org/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-184) Add clear/mark success to CLI

2016-06-06 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15316887#comment-15316887
 ] 

Chris Riccomini commented on AIRFLOW-184:
-

{quote}
As for the --only_failed flag, personally it is more intuitive to treat 
'upstream_failed' the same as 'failed' and have those tasks cleared as well 
(right now it is ignored).
{quote}

IMO, if we're shooting for parity with the UI, we should handle the state 
change regardless of what the upstream/downstream states are. I believe this is 
how the UI works. If I set "downstream", then it will clear all downstream 
states of the task(s), regardless of whether they're upstream_failed, success, 
failed, up_for_retry, or whatever.

{quote}
Just noticed that 'airflow backfill' and 'airflow run' both have a 
--mark_success flag which can be used to set Task Instances success just like 
the UI widget.
{quote}

I'm not a huge fan of the way that backfill works. I'd prefer to remove the 
ability for backfill to `mark success`, and just have a single place in the CLI 
to do this. [~jlowin]/[~bolke] given that the UI currently fiddles with task 
state independent of the scheduler, is it OK to do this via the UI as well (vs 
going through the scheduler, and skipping actual execution to mark state 
changes).

My current vote is to:

# Remove the ability to mark success via backfill.
# Make `airflow clear` have parity with the web UI, which means supporting 
upstream/downstream/recursive the way that the UI does.

I'm not so concerned about past/present now, given that you can specify date 
ranges via

Note: (1) is backwards incompatible. I suspect the only people who might care 
about this are the AirBNB folks. [~maxime.beauche...@apache.org], [~pauly], 
[~aoen].

> Add clear/mark success to CLI
> -
>
> Key: AIRFLOW-184
> URL: https://issues.apache.org/jira/browse/AIRFLOW-184
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: cli
>Reporter: Chris Riccomini
>Assignee: Joy Gao
>
> AIRFLOW-177 pointed out that the current CLI does not allow us to clear or 
> mark success a task (including upstream, downstream, past, future, and 
> recursive) the way that the UI widget does. Given a goal of keeping parity 
> between the UI and CLI, it seems like we should support this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-194) Task hangs in up_for_retry state for very long

2016-06-06 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15316827#comment-15316827
 ] 

Chris Riccomini commented on AIRFLOW-194:
-

[~miso], do you have an example DAG that you can share that replicates this 
issue?

> Task hangs in up_for_retry state for very long
> --
>
> Key: AIRFLOW-194
> URL: https://issues.apache.org/jira/browse/AIRFLOW-194
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
>Affects Versions: Airflow 1.7.0
> Environment: Airflow 1.7.0 on RHEL 7 and OpenSuse 13.2
>Reporter: Michal TOMA
>Assignee: Siddharth Anand
>
> I can observe this problem on 2 separate Airflow installations.
> The symptoms are:
> - One (and only one) task stays in up_for_retry state even when the last of 
> the retries finished with an OK stays.
> - It is yellow in the tree view.
> - The execution somehow resumes several hours later automatically
> - It seems (not a certitude) related to a mode when the task execution is 
> "lagging" behind normal execution.
> Here is an example of a task that should run every hour "0 * * * *":
> Current date : 2016-05-30T15:31:00+0200
> - Run 1 --
> Run ID: 2016-05-05T21:00:00
> Task start: 2015-05-30T07:38:XX.XXX
> Task end: 2015-05-30T08:23:XX.XXX
> Marked as success
> - Run 2 --
> Run ID: 2016-05-05T22:00:00
> Task start: 2015-05-30T11:10:XX.XXX
> Task end: 2015-05-30T11:56:XX.XXX
> Marked as success
> - Run 3 --
> Run ID: 2016-05-05T23:00:00
> Task start: 2015-05-30T11:56:XX.XXX
> Task end: 2015-05-30T12:41:XX.XXX
> Marked as success
> - Run 4 --
> Run ID: 2016-05-06T00:00:00
> Task start: 2015-05-30T15:12:XX.XXX
> Task end: (Still running now)
> Marked as running
> There are nearly 2 hours between Run-1 and Run-2, and nearly 2 hours as well 
> between Run-3 and Run-4.
> Only Run-3 starts immediately after the end of Run-2 what is the expected 
> behavior as the Runs are very late on schedule (Run ID is 2016-05-06 while we 
> are on 2016-05-30)
> This is a high priority issue for our setup. I could try to dig more in depth 
> into this problem but I have no idea where to look to debug this issue.
> Any pointers would be more than welcome.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-194) Task hangs in up_for_retry state for very long

2016-06-06 Thread Michal TOMA (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15316417#comment-15316417
 ] 

Michal TOMA commented on AIRFLOW-194:
-

I tested wit 1.7.1.2 and the problem is still present.
It seems to me that the delays between task runs are lower. Before (with 1.7.0) 
they were more like 2 hours now (with 1.7.1.2) they seems to be lore like 1 
hour but i still have unexplainable delays between the end of a task run and 
the start of the next run.

> Task hangs in up_for_retry state for very long
> --
>
> Key: AIRFLOW-194
> URL: https://issues.apache.org/jira/browse/AIRFLOW-194
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
>Affects Versions: Airflow 1.7.0
> Environment: Airflow 1.7.0 on RHEL 7 and OpenSuse 13.2
>Reporter: Michal TOMA
>Assignee: Siddharth Anand
>
> I can observe this problem on 2 separate Airflow installations.
> The symptoms are:
> - One (and only one) task stays in up_for_retry state even when the last of 
> the retries finished with an OK stays.
> - It is yellow in the tree view.
> - The execution somehow resumes several hours later automatically
> - It seems (not a certitude) related to a mode when the task execution is 
> "lagging" behind normal execution.
> Here is an example of a task that should run every hour "0 * * * *":
> Current date : 2016-05-30T15:31:00+0200
> - Run 1 --
> Run ID: 2016-05-05T21:00:00
> Task start: 2015-05-30T07:38:XX.XXX
> Task end: 2015-05-30T08:23:XX.XXX
> Marked as success
> - Run 2 --
> Run ID: 2016-05-05T22:00:00
> Task start: 2015-05-30T11:10:XX.XXX
> Task end: 2015-05-30T11:56:XX.XXX
> Marked as success
> - Run 3 --
> Run ID: 2016-05-05T23:00:00
> Task start: 2015-05-30T11:56:XX.XXX
> Task end: 2015-05-30T12:41:XX.XXX
> Marked as success
> - Run 4 --
> Run ID: 2016-05-06T00:00:00
> Task start: 2015-05-30T15:12:XX.XXX
> Task end: (Still running now)
> Marked as running
> There are nearly 2 hours between Run-1 and Run-2, and nearly 2 hours as well 
> between Run-3 and Run-4.
> Only Run-3 starts immediately after the end of Run-2 what is the expected 
> behavior as the Runs are very late on schedule (Run ID is 2016-05-06 while we 
> are on 2016-05-30)
> This is a high priority issue for our setup. I could try to dig more in depth 
> into this problem but I have no idea where to look to debug this issue.
> Any pointers would be more than welcome.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-91) Ssl gunicorn

2016-06-06 Thread Stanilovsky Evgeny (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-91?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15316300#comment-15316300
 ] 

Stanilovsky Evgeny commented on AIRFLOW-91:
---

https://github.com/apache/incubator-airflow/pull/1497/commits/50b99e3adff25bbdf4282d5e49e03257033efd17

> Ssl gunicorn
> 
>
> Key: AIRFLOW-91
> URL: https://issues.apache.org/jira/browse/AIRFLOW-91
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: security
>Reporter: Stanilovsky Evgeny
>Assignee: Stanilovsky Evgeny
>
> old issue : https://github.com/apache/incubator-airflow/pull/1492
> Ssl gunicorn support



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)