[jira] [Created] (AIRFLOW-1140) DatabricksSubmitRunOperator should template the "json" field.
Andrew Chen created AIRFLOW-1140: Summary: DatabricksSubmitRunOperator should template the "json" field. Key: AIRFLOW-1140 URL: https://issues.apache.org/jira/browse/AIRFLOW-1140 Project: Apache Airflow Issue Type: Improvement Reporter: Andrew Chen Assignee: Andrew Chen In the DatabricksSubmitRunOperator we should make sure to apply templating to the json parameter. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1026) connection string using _cmd tin airflow.cfg is broken
[ https://issues.apache.org/jira/browse/AIRFLOW-1026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15979437#comment-15979437 ] Marc Weil commented on AIRFLOW-1026: I'm having the same issue as well, and sadly this will prevent me from upgrading to 1.8.0 until it's resolved. > connection string using _cmd tin airflow.cfg is broken > -- > > Key: AIRFLOW-1026 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1026 > Project: Apache Airflow > Issue Type: Bug > Components: configuration >Affects Versions: Airflow 1.8 >Reporter: Harish Singh >Priority: Critical > Fix For: Airflow 1.8 > > > sql_alchemy_conn_cmd = python ./pipeline/dags/configure.py > I am expectiing configure.py to be invoked. > But it just throws: > "cannot use sqlite with the LocalExecutor" > The connection string that my script "configure.py" would return is something > like this: > mysql+mysqldb://username:**@mysqlhostname:3306/airflowdbname > But after debugging, I found that, my script is not getting invoked at all. > This is my airflow.cfg: > executor = LocalExecutor > sql_alchemy_conn_cmd = python ./pipeline/dags/configure.py > sql_alchemy_pool_size = 5 > sql_alchemy_pool_recycle = 3600 > I tried not using the script and directly hardcoding the conn_url > sql_alchemy_conn = > mysql+mysqldb://username:**@mysqlhostname:3306/airflowdbname > It works. > But there is a regression bug if somebody wants to use "sql_alchemy_conn_cmd" -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1138) Add licenses to files in scripts directory
[ https://issues.apache.org/jira/browse/AIRFLOW-1138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15979315#comment-15979315 ] ASF subversion and git services commented on AIRFLOW-1138: -- Commit 4b5c6efd4a450b4a202f87cb12ea1f9eb4daf8fc in incubator-airflow's branch refs/heads/v1-8-test from [~criccomini] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=4b5c6ef ] [AIRFLOW-1138] Add missing licenses to files in scripts directory Closes #2253 from criccomini/AIRFLOW-1138 (cherry picked from commit 94f9822ffd867e559fd71046124626fee6acedf7) > Add licenses to files in scripts directory > -- > > Key: AIRFLOW-1138 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1138 > Project: Apache Airflow > Issue Type: Task > Components: release >Reporter: Chris Riccomini >Assignee: Chris Riccomini >Priority: Blocker > Fix For: 1.8.1 > > > These two files need license headers: > modified: scripts/ci/requirements.txt > modified: scripts/systemd/airflow -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1138) Add licenses to files in scripts directory
[ https://issues.apache.org/jira/browse/AIRFLOW-1138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15979314#comment-15979314 ] ASF subversion and git services commented on AIRFLOW-1138: -- Commit 4b5c6efd4a450b4a202f87cb12ea1f9eb4daf8fc in incubator-airflow's branch refs/heads/v1-8-test from [~criccomini] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=4b5c6ef ] [AIRFLOW-1138] Add missing licenses to files in scripts directory Closes #2253 from criccomini/AIRFLOW-1138 (cherry picked from commit 94f9822ffd867e559fd71046124626fee6acedf7) > Add licenses to files in scripts directory > -- > > Key: AIRFLOW-1138 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1138 > Project: Apache Airflow > Issue Type: Task > Components: release >Reporter: Chris Riccomini >Assignee: Chris Riccomini >Priority: Blocker > Fix For: 1.8.1 > > > These two files need license headers: > modified: scripts/ci/requirements.txt > modified: scripts/systemd/airflow -- This message was sent by Atlassian JIRA (v6.3.15#6346)
incubator-airflow git commit: [AIRFLOW-1138] Add missing licenses to files in scripts directory
Repository: incubator-airflow Updated Branches: refs/heads/v1-8-test dc6ebaab9 -> 4b5c6efd4 [AIRFLOW-1138] Add missing licenses to files in scripts directory Closes #2253 from criccomini/AIRFLOW-1138 (cherry picked from commit 94f9822ffd867e559fd71046124626fee6acedf7) Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4b5c6efd Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4b5c6efd Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4b5c6efd Branch: refs/heads/v1-8-test Commit: 4b5c6efd4a450b4a202f87cb12ea1f9eb4daf8fc Parents: dc6ebaa Author: Chris RiccominiAuthored: Fri Apr 21 13:16:54 2017 -0700 Committer: Chris Riccomini Committed: Fri Apr 21 13:20:06 2017 -0700 -- scripts/ci/requirements.txt | 13 + scripts/systemd/airflow | 13 + 2 files changed, 26 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4b5c6efd/scripts/ci/requirements.txt -- diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt index 9a2bce2..0e0f980 100644 --- a/scripts/ci/requirements.txt +++ b/scripts/ci/requirements.txt @@ -1,3 +1,16 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + alembic bcrypt boto http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4b5c6efd/scripts/systemd/airflow -- diff --git a/scripts/systemd/airflow b/scripts/systemd/airflow index 87b79b9..5317548 100644 --- a/scripts/systemd/airflow +++ b/scripts/systemd/airflow @@ -1,3 +1,16 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + # This file is the environment file for Airflow. Put this file in /etc/sysconfig/airflow per default # configuration of the systemd unit files. #
[jira] [Closed] (AIRFLOW-1138) Add licenses to files in scripts directory
[ https://issues.apache.org/jira/browse/AIRFLOW-1138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Riccomini closed AIRFLOW-1138. Resolution: Fixed > Add licenses to files in scripts directory > -- > > Key: AIRFLOW-1138 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1138 > Project: Apache Airflow > Issue Type: Task > Components: release >Reporter: Chris Riccomini >Assignee: Chris Riccomini >Priority: Blocker > Fix For: 1.8.1 > > > These two files need license headers: > modified: scripts/ci/requirements.txt > modified: scripts/systemd/airflow -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1138) Add licenses to files in scripts directory
[ https://issues.apache.org/jira/browse/AIRFLOW-1138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15979304#comment-15979304 ] ASF subversion and git services commented on AIRFLOW-1138: -- Commit 94f9822ffd867e559fd71046124626fee6acedf7 in incubator-airflow's branch refs/heads/master from [~criccomini] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=94f9822 ] [AIRFLOW-1138] Add missing licenses to files in scripts directory Closes #2253 from criccomini/AIRFLOW-1138 > Add licenses to files in scripts directory > -- > > Key: AIRFLOW-1138 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1138 > Project: Apache Airflow > Issue Type: Task > Components: release >Reporter: Chris Riccomini >Assignee: Chris Riccomini >Priority: Blocker > Fix For: 1.8.1 > > > These two files need license headers: > modified: scripts/ci/requirements.txt > modified: scripts/systemd/airflow -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1138) Add licenses to files in scripts directory
[ https://issues.apache.org/jira/browse/AIRFLOW-1138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15979306#comment-15979306 ] ASF subversion and git services commented on AIRFLOW-1138: -- Commit 94f9822ffd867e559fd71046124626fee6acedf7 in incubator-airflow's branch refs/heads/master from [~criccomini] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=94f9822 ] [AIRFLOW-1138] Add missing licenses to files in scripts directory Closes #2253 from criccomini/AIRFLOW-1138 > Add licenses to files in scripts directory > -- > > Key: AIRFLOW-1138 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1138 > Project: Apache Airflow > Issue Type: Task > Components: release >Reporter: Chris Riccomini >Assignee: Chris Riccomini >Priority: Blocker > Fix For: 1.8.1 > > > These two files need license headers: > modified: scripts/ci/requirements.txt > modified: scripts/systemd/airflow -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (AIRFLOW-1139) Scheduler runs very slowly when many DAGs in DAG directory
David Vaughan created AIRFLOW-1139: -- Summary: Scheduler runs very slowly when many DAGs in DAG directory Key: AIRFLOW-1139 URL: https://issues.apache.org/jira/browse/AIRFLOW-1139 Project: Apache Airflow Issue Type: Improvement Affects Versions: 1.8.0 Environment: macOS Sierra, v10.12.2, MacBook Pro, 2.5 GHz Intel Core i7, 16 GB RAM Reporter: David Vaughan Priority: Minor When we have several (10-15) DAGs in our DAG directory, and each of them is pretty large (~900 tasks on average), Airflow's periodic re-processing of the DAGs in our DAG directory takes a long time and takes resources away from running DAGs. Almost always we only have one DAG actually running at any given time, and the rest are paused. The one running DAG, however, crawls along noticeably slower than if we only have one or two DAGs total in the DAG directory. I think it would be nice to have an option to turn off re-processing of DAGs completely, after the initial processing. The way we use Airflow right now, we don't edit our existing DAGs frequently, so we have no need for periodic refresh. We have experimented with the min_file_process_interval option in airflow.cfg, but setting it to small numbers causes no noticeable change, and setting it to very large numbers (to emulate not refreshing at all) actually causes the DAG to run much slower than it already was. Is anybody else still experiencing this? Are there existing ways to avoid this problem? Here are some links to people referencing, I believe, this same issue, but they're all from last year: https://issues.apache.org/jira/browse/AIRFLOW-160 https://github.com/apache/incubator-airflow/pull/1636 https://issues.apache.org/jira/browse/AIRFLOW-435 http://stackoverflow.com/questions/40466732/apache-airflow-scheduler-slowness Thanks in advance for any discussion or help. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (AIRFLOW-1138) Add licenses to files in scripts directory
[ https://issues.apache.org/jira/browse/AIRFLOW-1138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Riccomini updated AIRFLOW-1138: - Priority: Blocker (was: Major) > Add licenses to files in scripts directory > -- > > Key: AIRFLOW-1138 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1138 > Project: Apache Airflow > Issue Type: Task > Components: release >Reporter: Chris Riccomini >Assignee: Chris Riccomini >Priority: Blocker > Fix For: 1.8.1 > > > These two files need license headers: > modified: scripts/ci/requirements.txt > modified: scripts/systemd/airflow -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (AIRFLOW-1138) Add licenses to files in scripts directory
Chris Riccomini created AIRFLOW-1138: Summary: Add licenses to files in scripts directory Key: AIRFLOW-1138 URL: https://issues.apache.org/jira/browse/AIRFLOW-1138 Project: Apache Airflow Issue Type: Task Components: release Reporter: Chris Riccomini Assignee: Chris Riccomini Fix For: 1.8.1 These two files need license headers: modified: scripts/ci/requirements.txt modified: scripts/systemd/airflow -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1131) DockerOperator jobs time out and get stuck in "running" forever
[ https://issues.apache.org/jira/browse/AIRFLOW-1131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978734#comment-15978734 ] Vitor Baptista commented on AIRFLOW-1131: - [~saguziel] Yes, I went to the host machine and listed the running Docker containers. The task wasn't running anymore, even though Airflow still thought it was. {quote} The reload task actually also fails because of ``` {models.py:1140} INFO - Dependencies not met for , dependency 'Task Instance Not Already Running' FAILED: Task is already running, it started on 2017-04-20 11:19:59.597425. ``` so it never actually gets run. The original continues to run in our case. {quote} By "in our case", it means you tried the example DAG I wrote in the issue? If so, with which Airflow version (or commit hash)? Just by the log message, it seems like it tried running the task again, but failed because it was already running. It might be the fcase at the time, as I wasn't monitoring closely when the task stopped running, but it stopped nonetheless. I'm not sure how/where to debug this issue further. Any ideas? > DockerOperator jobs time out and get stuck in "running" forever > --- > > Key: AIRFLOW-1131 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1131 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Affects Versions: 1.9.0 > Environment: Python 2.7.12 > git+git://github.com/apache/incubator-airflow.git@35e43f5067f4741640278b765c0e54e4fd45ffa3#egg=airflow[async,password,celery,crypto,postgres,hive,hdfs,jdbc] >Reporter: Vitor Baptista > > With the following DAG and task: > {code} > import os > from datetime import datetime, timedelta > from airflow.models import DAG > from airflow.operators.docker_operator import DockerOperator > default_args = { > 'owner': 'airflow', > 'depends_on_past': False, > 'start_date': datetime(2017, 1, 1), > 'retries': 3, > 'retry_delay': timedelta(minutes=10), > } > dag = DAG( > dag_id='smoke_test', > default_args=default_args, > max_active_runs=1, > schedule_interval='@daily' > ) > sleep_forever_task = DockerOperator( > task_id='sleep_forever', > dag=dag, > image='alpine:latest', > api_version=os.environ.get('DOCKER_API_VERSION', '1.23'), > command='sleep {}'.format(60 * 60 * 24), > ) > {code} > When I run it, this is what I get: > {code} > *** Log file isn't local. > *** Fetching here: > http://589ea17432ec:8793/log/smoke_test/sleep_forever/2017-04-18T00:00:00 > [2017-04-20 11:19:58,258] {models.py:172} INFO - Filling up the DagBag from > /usr/local/airflow/dags/smoke_test.py > [2017-04-20 11:19:58,438] {base_task_runner.py:112} INFO - Running: ['bash', > '-c', u'airflow run smoke_test sleep_forever 2017-04-18T00:00:00 --job_id > 2537 --raw -sd DAGS_FOLDER/smoke_test.py'] > [2017-04-20 11:19:58,888] {base_task_runner.py:95} INFO - Subtask: > /usr/local/airflow/src/airflow/airflow/configuration.py:128: > DeprecationWarning: This method will be removed in future versions. Use > 'parser.read_file()' instead. > [2017-04-20 11:19:58,888] {base_task_runner.py:95} INFO - Subtask: > self.readfp(StringIO.StringIO(string)) > [2017-04-20 11:19:59,214] {base_task_runner.py:95} INFO - Subtask: > [2017-04-20 11:19:59,214] {__init__.py:56} INFO - Using executor > CeleryExecutor > [2017-04-20 11:19:59,227] {base_task_runner.py:95} INFO - Subtask: > [2017-04-20 11:19:59,227] {driver.py:120} INFO - Generating grammar tables > from /usr/lib/python2.7/lib2to3/Grammar.txt > [2017-04-20 11:19:59,244] {base_task_runner.py:95} INFO - Subtask: > [2017-04-20 11:19:59,244] {driver.py:120} INFO - Generating grammar tables > from /usr/lib/python2.7/lib2to3/PatternGrammar.txt > [2017-04-20 11:19:59,377] {base_task_runner.py:95} INFO - Subtask: > [2017-04-20 11:19:59,377] {models.py:172} INFO - Filling up the DagBag from > /usr/local/airflow/dags/smoke_test.py > [2017-04-20 11:19:59,597] {base_task_runner.py:95} INFO - Subtask: > [2017-04-20 11:19:59,597] {models.py:1146} INFO - Dependencies all met for > > [2017-04-20 11:19:59,605] {base_task_runner.py:95} INFO - Subtask: > [2017-04-20 11:19:59,605] {models.py:1146} INFO - Dependencies all met for > > [2017-04-20 11:19:59,605] {base_task_runner.py:95} INFO - Subtask: > [2017-04-20 11:19:59,605] {models.py:1338} INFO - > [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask: > > [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask: Starting > attempt 1 of 4 > [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask: > > [2017-04-20 11:19:59,606] {base_task_runner.py:95}
[jira] [Commented] (AIRFLOW-492) Insert into dag_stats table results into failed task while task itself succeeded
[ https://issues.apache.org/jira/browse/AIRFLOW-492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978573#comment-15978573 ] gero commented on AIRFLOW-492: -- I fixed this by changing the def clean_dirty(dag_ids, session=None) inside models.py removed: 1. qry.delete(synchronize_session='fetch') 2. session.commit() right after qry.delete call changed: session.add(...) -> session.merge(DagStat(dag_id=dag_id, state=state, count=count, dirty=False)) Does this look right? > Insert into dag_stats table results into failed task while task itself > succeeded > > > Key: AIRFLOW-492 > URL: https://issues.apache.org/jira/browse/AIRFLOW-492 > Project: Apache Airflow > Issue Type: Bug >Reporter: Bolke de Bruin >Assignee: Siddharth Anand >Priority: Critical > Attachments: subdag_test.py > > > In some occasions there seem to be a duplicate key being inserted in > dag_stats that results in a task/dag run being marked failed while the task > itself has succeeded. > [2016-09-07 18:44:16,940] {models.py:3912} INFO - Marking run hanging_subdags_n16_sqe.level_2_14 @ 2016-04-21 00:00:00: > backfill_2016-04-21T00:00:00, externally triggered: False> successful > [2016-09-07 18:44:17,671] {models.py:1450} ERROR - > (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry > 'hanging_subdags_n16_sqe.level_2_14-success' for key 'PRIMARY'") [SQL: > u'INSERT INTO dag_stats (dag_id, state, count, dirty) VALUES (%s, %s, %s, > %s)'] [parameters: ('hanging_subdags_n16_sqe.level_2_14', 'success', 3L, 0)] > Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py", > line 1409, in run > result = task_copy.execute(context=context) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/subdag_operator.py", > line 88, in execute > executor=self.executor) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py", > line 3244, in run > job.run() > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py", > line 189, in run > self._execute() > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py", > line 1855, in _execute > models.DagStat.clean_dirty([run.dag_id], session=session) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/utils/db.py", > line 54, in wrapper > result = func(*args, **kwargs) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py", > line 3695, in clean_dirty > session.commit() > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 801, in commit > self.transaction.commit() > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 392, in commit > self._prepare_impl() > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 372, in _prepare_impl > self.session.flush() > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 2019, in flush > self._flush(objects) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 2137, in _flush > transaction.rollback(_capture_exception=True) > File > "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/langhelpers.py", line > 60, in __exit__ > compat.reraise(exc_type, exc_value, exc_tb) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 2101, in _flush > flush_context.execute() > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", > line 373, in execute > rec.execute(self) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", > line 532, in execute > uow > File > "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line > 174, in save_obj > mapper, table, insert) > File > "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line > 767, in _emit_insert_statements > execute(statement, multiparams) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", > line 914, in execute > return meth(self, multiparams, params) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/elements.py", > line 323, in _execute_on_connection > return connection._execute_clauseelement(self, multiparams, params) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", > line 1010, in _execute_clauseelement > compiled_sql, distilled_params > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", > line 1146, in
[jira] [Commented] (AIRFLOW-492) Insert into dag_stats table results into failed task while task itself succeeded
[ https://issues.apache.org/jira/browse/AIRFLOW-492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978507#comment-15978507 ] Bolke de Bruin commented on AIRFLOW-492: Will have a look. In any circumstances updating the stats should never end up in failure of the task. > Insert into dag_stats table results into failed task while task itself > succeeded > > > Key: AIRFLOW-492 > URL: https://issues.apache.org/jira/browse/AIRFLOW-492 > Project: Apache Airflow > Issue Type: Bug >Reporter: Bolke de Bruin >Assignee: Siddharth Anand >Priority: Critical > Attachments: subdag_test.py > > > In some occasions there seem to be a duplicate key being inserted in > dag_stats that results in a task/dag run being marked failed while the task > itself has succeeded. > [2016-09-07 18:44:16,940] {models.py:3912} INFO - Marking run hanging_subdags_n16_sqe.level_2_14 @ 2016-04-21 00:00:00: > backfill_2016-04-21T00:00:00, externally triggered: False> successful > [2016-09-07 18:44:17,671] {models.py:1450} ERROR - > (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry > 'hanging_subdags_n16_sqe.level_2_14-success' for key 'PRIMARY'") [SQL: > u'INSERT INTO dag_stats (dag_id, state, count, dirty) VALUES (%s, %s, %s, > %s)'] [parameters: ('hanging_subdags_n16_sqe.level_2_14', 'success', 3L, 0)] > Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py", > line 1409, in run > result = task_copy.execute(context=context) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/subdag_operator.py", > line 88, in execute > executor=self.executor) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py", > line 3244, in run > job.run() > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py", > line 189, in run > self._execute() > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py", > line 1855, in _execute > models.DagStat.clean_dirty([run.dag_id], session=session) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/utils/db.py", > line 54, in wrapper > result = func(*args, **kwargs) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py", > line 3695, in clean_dirty > session.commit() > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 801, in commit > self.transaction.commit() > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 392, in commit > self._prepare_impl() > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 372, in _prepare_impl > self.session.flush() > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 2019, in flush > self._flush(objects) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 2137, in _flush > transaction.rollback(_capture_exception=True) > File > "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/langhelpers.py", line > 60, in __exit__ > compat.reraise(exc_type, exc_value, exc_tb) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 2101, in _flush > flush_context.execute() > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", > line 373, in execute > rec.execute(self) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", > line 532, in execute > uow > File > "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line > 174, in save_obj > mapper, table, insert) > File > "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line > 767, in _emit_insert_statements > execute(statement, multiparams) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", > line 914, in execute > return meth(self, multiparams, params) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/elements.py", > line 323, in _execute_on_connection > return connection._execute_clauseelement(self, multiparams, params) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", > line 1010, in _execute_clauseelement > compiled_sql, distilled_params > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", > line 1146, in _execute_context > context) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", > line 1341, in _handle_dbapi_exception > exc_info > File
[jira] [Created] (AIRFLOW-1137) Problem installing [all] subpackages python3
Hamed created AIRFLOW-1137: -- Summary: Problem installing [all] subpackages python3 Key: AIRFLOW-1137 URL: https://issues.apache.org/jira/browse/AIRFLOW-1137 Project: Apache Airflow Issue Type: Bug Reporter: Hamed Priority: Minor I am installing all packages for airflow in python3 using: {noformat}pip3 install 'airflow[all]'{noformat} but it throws me the following error: {code:xml} Collecting cx_Oracle>=5.1.2 (from airflow[all]) Downloading cx_Oracle-5.3.tar.gz (129kB) 100% || 133kB 5.9MB/s Complete output from command python setup.py egg_info: Traceback (most recent call last): File "", line 1, in File "/private/tmp/pip-build-5re1trj4/cx-Oracle/setup.py", line 174, in raise DistutilsSetupError("cannot locate an Oracle software " \ distutils.errors.DistutilsSetupError: cannot locate an Oracle software installation {code} I dont want to use oracle subpackage but that blocks the installation of other packages. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1136) Invalid parameters are not captured for Sqoop operators
[ https://issues.apache.org/jira/browse/AIRFLOW-1136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978441#comment-15978441 ] ASF subversion and git services commented on AIRFLOW-1136: -- Commit 2ef4dbbe0bf6e8ca116ad01bf209e7155d311d43 in incubator-airflow's branch refs/heads/master from [~hgrif] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=2ef4dbb ] [AIRFLOW-1136] Capture invalid arguments for Sqoop Invalid arguments are not captured for the SqoopHook and SqoopOperator: - SqoopHook should raise an exception if the file_type is invalid - SqoopOperator should raise an exception if the cmd_type is invalid Closes #2252 from hgrif/AIRFLOW-1136 > Invalid parameters are not captured for Sqoop operators > --- > > Key: AIRFLOW-1136 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1136 > Project: Apache Airflow > Issue Type: Bug >Reporter: Henk Griffioen >Assignee: Henk Griffioen >Priority: Minor > > 1. The Sqoophook exports data as text if argument file_type is anything other > than 'avro', 'sequence' or 'parquet'. Correct behaviour would be to error out > if file_type is not 'avro', 'sequence', 'parquet' or 'text'. > 2. SqoopOperator runs the import command if argument cmd_type is anything > other than 'export'. Correct behaviour would be to error out if cmd_type is > not 'import' or 'export'. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (AIRFLOW-1136) Invalid parameters are not captured for Sqoop operators
[ https://issues.apache.org/jira/browse/AIRFLOW-1136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bolke de Bruin updated AIRFLOW-1136: Component/s: operators > Invalid parameters are not captured for Sqoop operators > --- > > Key: AIRFLOW-1136 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1136 > Project: Apache Airflow > Issue Type: Bug > Components: operators >Affects Versions: 1.8.1 >Reporter: Henk Griffioen >Assignee: Henk Griffioen >Priority: Minor > Fix For: 1.9.0, 1.8.2 > > > 1. The Sqoophook exports data as text if argument file_type is anything other > than 'avro', 'sequence' or 'parquet'. Correct behaviour would be to error out > if file_type is not 'avro', 'sequence', 'parquet' or 'text'. > 2. SqoopOperator runs the import command if argument cmd_type is anything > other than 'export'. Correct behaviour would be to error out if cmd_type is > not 'import' or 'export'. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
incubator-airflow git commit: [AIRFLOW-1136] Capture invalid arguments for Sqoop
Repository: incubator-airflow Updated Branches: refs/heads/master 659827639 -> 2ef4dbbe0 [AIRFLOW-1136] Capture invalid arguments for Sqoop Invalid arguments are not captured for the SqoopHook and SqoopOperator: - SqoopHook should raise an exception if the file_type is invalid - SqoopOperator should raise an exception if the cmd_type is invalid Closes #2252 from hgrif/AIRFLOW-1136 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2ef4dbbe Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2ef4dbbe Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2ef4dbbe Branch: refs/heads/master Commit: 2ef4dbbe0bf6e8ca116ad01bf209e7155d311d43 Parents: 6598276 Author: Henk GriffioenAuthored: Fri Apr 21 12:08:48 2017 +0200 Committer: Bolke de Bruin Committed: Fri Apr 21 12:08:48 2017 +0200 -- airflow/contrib/hooks/sqoop_hook.py| 50 ++- airflow/contrib/operators/sqoop_operator.py| 6 +- tests/contrib/hooks/test_sqoop_hook.py | 92 - tests/contrib/operators/test_sqoop_operator.py | 7 ++ 4 files changed, 91 insertions(+), 64 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2ef4dbbe/airflow/contrib/hooks/sqoop_hook.py -- diff --git a/airflow/contrib/hooks/sqoop_hook.py b/airflow/contrib/hooks/sqoop_hook.py index e1f4779..6c5ee58 100644 --- a/airflow/contrib/hooks/sqoop_hook.py +++ b/airflow/contrib/hooks/sqoop_hook.py @@ -28,21 +28,27 @@ log = logging.getLogger(__name__) class SqoopHook(BaseHook): """ -This Hook is a wrapper around the sqoop 1 binary. To be able to use te hook +This hook is a wrapper around the sqoop 1 binary. To be able to use the hook it is required that "sqoop" is in the PATH. -:param job_tracker: (from json) specify a job tracker local|jobtracker:port -:type job_tracker: str -:param namenode: (from json) specify a namenode -:type namenode: str -:param lib_jars: (from json) specify comma separated jar -files to include in the classpath. -:type lib_jars: str -:param files: (from json) specify comma separated files to be copied to -the map reduce cluster -:type files: (from json) str -:param archives: (from json) specify comma separated archives to be -unarchived on the compute machines. -:type archives: str + +Additional arguments that can be passed via the 'extra' JSON field of the +sqoop connection: +* job_tracker: Job tracker local|jobtracker:port. +* namenode: Namenode. +* lib_jars: Comma separated jar files to include in the classpath. +* files: Comma separated files to be copied to the map reduce cluster. +* archives: Comma separated archives to be unarchived on the compute +machines. +* password_file: Path to file containing the password. + +:param conn_id: Reference to the sqoop connection. +:type conn_id: str +:param verbose: Set sqoop to verbose. +:type verbose: bool +:param num_mappers: Number of map tasks to import in parallel. +:type num_mappers: str +:param properties: Properties to set via the -D argument +:type properties: dict """ def __init__(self, conn_id='sqoop_default', verbose=False, @@ -80,12 +86,11 @@ class SqoopHook(BaseHook): output, stderr = process.communicate() if process.returncode != 0: -raise AirflowException(( - "Cannot execute {} on {}. Error code is: {}" - "Output: {}, Stderr: {}" - ).format(cmd, self.conn.host, -process.returncode, output, -stderr)) +raise AirflowException( +"Cannot execute {} on {}. Error code is: {} Output: {}, " +"Stderr: {}".format(cmd, self.conn.host, process.returncode, +output, stderr) +) def _prepare_command(self, export=False): if export: @@ -132,8 +137,11 @@ class SqoopHook(BaseHook): return ["--as-sequencefile"] elif file_type == "parquet": return ["--as-parquetfile"] -else: +elif file_type == "text": return ["--as-textfile"] +else: +raise AirflowException("Argument file_type should be 'avro', " + "'sequence', 'parquet' or 'text'.") def _import_cmd(self, target_dir, append, file_type, split_by, direct, driver):
[jira] [Commented] (AIRFLOW-1136) Invalid parameters are not captured for Sqoop operators
[ https://issues.apache.org/jira/browse/AIRFLOW-1136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978442#comment-15978442 ] ASF subversion and git services commented on AIRFLOW-1136: -- Commit 2ef4dbbe0bf6e8ca116ad01bf209e7155d311d43 in incubator-airflow's branch refs/heads/master from [~hgrif] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=2ef4dbb ] [AIRFLOW-1136] Capture invalid arguments for Sqoop Invalid arguments are not captured for the SqoopHook and SqoopOperator: - SqoopHook should raise an exception if the file_type is invalid - SqoopOperator should raise an exception if the cmd_type is invalid Closes #2252 from hgrif/AIRFLOW-1136 > Invalid parameters are not captured for Sqoop operators > --- > > Key: AIRFLOW-1136 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1136 > Project: Apache Airflow > Issue Type: Bug >Reporter: Henk Griffioen >Assignee: Henk Griffioen >Priority: Minor > > 1. The Sqoophook exports data as text if argument file_type is anything other > than 'avro', 'sequence' or 'parquet'. Correct behaviour would be to error out > if file_type is not 'avro', 'sequence', 'parquet' or 'text'. > 2. SqoopOperator runs the import command if argument cmd_type is anything > other than 'export'. Correct behaviour would be to error out if cmd_type is > not 'import' or 'export'. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (AIRFLOW-1136) Invalid parameters are not captured for Sqoop operators
[ https://issues.apache.org/jira/browse/AIRFLOW-1136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bolke de Bruin resolved AIRFLOW-1136. - Resolution: Fixed > Invalid parameters are not captured for Sqoop operators > --- > > Key: AIRFLOW-1136 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1136 > Project: Apache Airflow > Issue Type: Bug > Components: operators >Affects Versions: 1.8.1 >Reporter: Henk Griffioen >Assignee: Henk Griffioen >Priority: Minor > Fix For: 1.9.0, 1.8.2 > > > 1. The Sqoophook exports data as text if argument file_type is anything other > than 'avro', 'sequence' or 'parquet'. Correct behaviour would be to error out > if file_type is not 'avro', 'sequence', 'parquet' or 'text'. > 2. SqoopOperator runs the import command if argument cmd_type is anything > other than 'export'. Correct behaviour would be to error out if cmd_type is > not 'import' or 'export'. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (AIRFLOW-1136) Invalid parameters are not captured for Sqoop operators
[ https://issues.apache.org/jira/browse/AIRFLOW-1136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bolke de Bruin updated AIRFLOW-1136: Affects Version/s: 1.8.1 > Invalid parameters are not captured for Sqoop operators > --- > > Key: AIRFLOW-1136 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1136 > Project: Apache Airflow > Issue Type: Bug > Components: operators >Affects Versions: 1.8.1 >Reporter: Henk Griffioen >Assignee: Henk Griffioen >Priority: Minor > Fix For: 1.9.0, 1.8.2 > > > 1. The Sqoophook exports data as text if argument file_type is anything other > than 'avro', 'sequence' or 'parquet'. Correct behaviour would be to error out > if file_type is not 'avro', 'sequence', 'parquet' or 'text'. > 2. SqoopOperator runs the import command if argument cmd_type is anything > other than 'export'. Correct behaviour would be to error out if cmd_type is > not 'import' or 'export'. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (AIRFLOW-1136) Invalid parameters are not captured for Sqoop operators
[ https://issues.apache.org/jira/browse/AIRFLOW-1136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bolke de Bruin updated AIRFLOW-1136: Fix Version/s: 1.9.0 1.8.2 > Invalid parameters are not captured for Sqoop operators > --- > > Key: AIRFLOW-1136 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1136 > Project: Apache Airflow > Issue Type: Bug > Components: operators >Affects Versions: 1.8.1 >Reporter: Henk Griffioen >Assignee: Henk Griffioen >Priority: Minor > Fix For: 1.9.0, 1.8.2 > > > 1. The Sqoophook exports data as text if argument file_type is anything other > than 'avro', 'sequence' or 'parquet'. Correct behaviour would be to error out > if file_type is not 'avro', 'sequence', 'parquet' or 'text'. > 2. SqoopOperator runs the import command if argument cmd_type is anything > other than 'export'. Correct behaviour would be to error out if cmd_type is > not 'import' or 'export'. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (AIRFLOW-1136) Invalid parameters are not captured for Sqoop operators
Henk Griffioen created AIRFLOW-1136: --- Summary: Invalid parameters are not captured for Sqoop operators Key: AIRFLOW-1136 URL: https://issues.apache.org/jira/browse/AIRFLOW-1136 Project: Apache Airflow Issue Type: Bug Reporter: Henk Griffioen Assignee: Henk Griffioen Priority: Minor 1. The Sqoophook exports data as text if argument file_type is anything other than 'avro', 'sequence' or 'parquet'. Correct behaviour would be to error out if file_type is not 'avro', 'sequence', 'parquet' or 'text'. 2. SqoopOperator runs the import command if argument cmd_type is anything other than 'export'. Correct behaviour would be to error out if cmd_type is not 'import' or 'export'. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (AIRFLOW-1135) Clearing XCom data from previous executions
Ultrabug created AIRFLOW-1135: - Summary: Clearing XCom data from previous executions Key: AIRFLOW-1135 URL: https://issues.apache.org/jira/browse/AIRFLOW-1135 Project: Apache Airflow Issue Type: New Feature Components: db Reporter: Ultrabug It looks like XCom data from previous executions of tasks are never cleared from database and that there's no easy way to do so. Since XComs are created by execution_date of a task, when a task heavily uses XComs (like 20 000+) we can end up with a large xcom table on the database. Is there something I'm missing about cleaning up those previous XComs from database (since they're not used anymore) ? Would you accept the idea of having a new PR providing a helper to remove them ? Thanks -- This message was sent by Atlassian JIRA (v6.3.15#6346)