[jira] [Commented] (AIRFLOW-1131) DockerOperator jobs time out and get stuck in "running" forever

2017-05-10 Thread Vitor Baptista (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16004421#comment-16004421
 ] 

Vitor Baptista commented on AIRFLOW-1131:
-

I've been running Airflow with the {{SequentialExecutor}} for a while now, as I 
thought it would work around this error. I was wrong: I'm seeing the same issue 
in production with {{SequentialExecutor}}. There's a task that's still in 
{{running}} state, for which Docker container exited with an error (exit code 
2) a couple days ago.

> DockerOperator jobs time out and get stuck in "running" forever
> ---
>
> Key: AIRFLOW-1131
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1131
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
>Affects Versions: 1.9.0
> Environment: Python 2.7.12
> git+git://github.com/apache/incubator-airflow.git@35e43f5067f4741640278b765c0e54e4fd45ffa3#egg=airflow[async,password,celery,crypto,postgres,hive,hdfs,jdbc]
>Reporter: Vitor Baptista
>
> With the following DAG and task:
> {code}
> import os
> from datetime import datetime, timedelta
> from airflow.models import DAG
> from airflow.operators.docker_operator import DockerOperator
> default_args = {
> 'owner': 'airflow',
> 'depends_on_past': False,
> 'start_date': datetime(2017, 1, 1),
> 'retries': 3,
> 'retry_delay': timedelta(minutes=10),
> }
> dag = DAG(
> dag_id='smoke_test',
> default_args=default_args,
> max_active_runs=1,
> schedule_interval='@daily'
> )
> sleep_forever_task = DockerOperator(
> task_id='sleep_forever',
> dag=dag,
> image='alpine:latest',
> api_version=os.environ.get('DOCKER_API_VERSION', '1.23'),
> command='sleep {}'.format(60 * 60 * 24),
> )
> {code}
> When I run it, this is what I get:
> {code}
> *** Log file isn't local.
> *** Fetching here: 
> http://589ea17432ec:8793/log/smoke_test/sleep_forever/2017-04-18T00:00:00
> [2017-04-20 11:19:58,258] {models.py:172} INFO - Filling up the DagBag from 
> /usr/local/airflow/dags/smoke_test.py
> [2017-04-20 11:19:58,438] {base_task_runner.py:112} INFO - Running: ['bash', 
> '-c', u'airflow run smoke_test sleep_forever 2017-04-18T00:00:00 --job_id 
> 2537 --raw -sd DAGS_FOLDER/smoke_test.py']
> [2017-04-20 11:19:58,888] {base_task_runner.py:95} INFO - Subtask: 
> /usr/local/airflow/src/airflow/airflow/configuration.py:128: 
> DeprecationWarning: This method will be removed in future versions.  Use 
> 'parser.read_file()' instead.
> [2017-04-20 11:19:58,888] {base_task_runner.py:95} INFO - Subtask:   
> self.readfp(StringIO.StringIO(string))
> [2017-04-20 11:19:59,214] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,214] {__init__.py:56} INFO - Using executor 
> CeleryExecutor
> [2017-04-20 11:19:59,227] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,227] {driver.py:120} INFO - Generating grammar tables 
> from /usr/lib/python2.7/lib2to3/Grammar.txt
> [2017-04-20 11:19:59,244] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,244] {driver.py:120} INFO - Generating grammar tables 
> from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
> [2017-04-20 11:19:59,377] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,377] {models.py:172} INFO - Filling up the DagBag from 
> /usr/local/airflow/dags/smoke_test.py
> [2017-04-20 11:19:59,597] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,597] {models.py:1146} INFO - Dependencies all met for 
> 
> [2017-04-20 11:19:59,605] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,605] {models.py:1146} INFO - Dependencies all met for 
> 
> [2017-04-20 11:19:59,605] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,605] {models.py:1338} INFO - 
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask: 
> 
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask: Starting 
> attempt 1 of 4
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask: 
> 
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,620] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,620] {models.py:1362} INFO - Executing 
>  on 2017-04-18 00:00:00
> [2017-04-20 11:19:59,662] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,661] {docker_operator.py:132} INFO - Starting docker 
> container from image alpine:latest
> [2017-04-20 12:21:25,661] {models.py:172} INFO - Filling up the DagBag from 
> /usr/local/airflow/dags/smoke_test.py
> [2017-04-20 12:21:25,809] {base_task_runner.py:112} INFO - Running: 

[jira] [Commented] (AIRFLOW-1131) DockerOperator jobs time out and get stuck in "running" forever

2017-04-28 Thread Vitor Baptista (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988774#comment-15988774
 ] 

Vitor Baptista commented on AIRFLOW-1131:
-

I tried the same sample task with the {{SequentialExecutor}} and the bug 
doesn't happen. Apparently this is somehow related to the {{CeleryExecutor}}.

> DockerOperator jobs time out and get stuck in "running" forever
> ---
>
> Key: AIRFLOW-1131
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1131
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
>Affects Versions: 1.9.0
> Environment: Python 2.7.12
> git+git://github.com/apache/incubator-airflow.git@35e43f5067f4741640278b765c0e54e4fd45ffa3#egg=airflow[async,password,celery,crypto,postgres,hive,hdfs,jdbc]
>Reporter: Vitor Baptista
>
> With the following DAG and task:
> {code}
> import os
> from datetime import datetime, timedelta
> from airflow.models import DAG
> from airflow.operators.docker_operator import DockerOperator
> default_args = {
> 'owner': 'airflow',
> 'depends_on_past': False,
> 'start_date': datetime(2017, 1, 1),
> 'retries': 3,
> 'retry_delay': timedelta(minutes=10),
> }
> dag = DAG(
> dag_id='smoke_test',
> default_args=default_args,
> max_active_runs=1,
> schedule_interval='@daily'
> )
> sleep_forever_task = DockerOperator(
> task_id='sleep_forever',
> dag=dag,
> image='alpine:latest',
> api_version=os.environ.get('DOCKER_API_VERSION', '1.23'),
> command='sleep {}'.format(60 * 60 * 24),
> )
> {code}
> When I run it, this is what I get:
> {code}
> *** Log file isn't local.
> *** Fetching here: 
> http://589ea17432ec:8793/log/smoke_test/sleep_forever/2017-04-18T00:00:00
> [2017-04-20 11:19:58,258] {models.py:172} INFO - Filling up the DagBag from 
> /usr/local/airflow/dags/smoke_test.py
> [2017-04-20 11:19:58,438] {base_task_runner.py:112} INFO - Running: ['bash', 
> '-c', u'airflow run smoke_test sleep_forever 2017-04-18T00:00:00 --job_id 
> 2537 --raw -sd DAGS_FOLDER/smoke_test.py']
> [2017-04-20 11:19:58,888] {base_task_runner.py:95} INFO - Subtask: 
> /usr/local/airflow/src/airflow/airflow/configuration.py:128: 
> DeprecationWarning: This method will be removed in future versions.  Use 
> 'parser.read_file()' instead.
> [2017-04-20 11:19:58,888] {base_task_runner.py:95} INFO - Subtask:   
> self.readfp(StringIO.StringIO(string))
> [2017-04-20 11:19:59,214] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,214] {__init__.py:56} INFO - Using executor 
> CeleryExecutor
> [2017-04-20 11:19:59,227] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,227] {driver.py:120} INFO - Generating grammar tables 
> from /usr/lib/python2.7/lib2to3/Grammar.txt
> [2017-04-20 11:19:59,244] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,244] {driver.py:120} INFO - Generating grammar tables 
> from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
> [2017-04-20 11:19:59,377] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,377] {models.py:172} INFO - Filling up the DagBag from 
> /usr/local/airflow/dags/smoke_test.py
> [2017-04-20 11:19:59,597] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,597] {models.py:1146} INFO - Dependencies all met for 
> 
> [2017-04-20 11:19:59,605] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,605] {models.py:1146} INFO - Dependencies all met for 
> 
> [2017-04-20 11:19:59,605] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,605] {models.py:1338} INFO - 
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask: 
> 
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask: Starting 
> attempt 1 of 4
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask: 
> 
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,620] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,620] {models.py:1362} INFO - Executing 
>  on 2017-04-18 00:00:00
> [2017-04-20 11:19:59,662] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,661] {docker_operator.py:132} INFO - Starting docker 
> container from image alpine:latest
> [2017-04-20 12:21:25,661] {models.py:172} INFO - Filling up the DagBag from 
> /usr/local/airflow/dags/smoke_test.py
> [2017-04-20 12:21:25,809] {base_task_runner.py:112} INFO - Running: ['bash', 
> '-c', u'airflow run smoke_test sleep_forever 2017-04-18T00:00:00 --job_id 
> 2574 --raw -sd DAGS_FOLDER/smoke_test.py']
> [2017-04-20 12:21:26,117] {base_task_runner.py:95} 

[jira] [Commented] (AIRFLOW-1131) DockerOperator jobs time out and get stuck in "running" forever

2017-04-21 Thread Vitor Baptista (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978734#comment-15978734
 ] 

Vitor Baptista commented on AIRFLOW-1131:
-

[~saguziel] Yes, I went to the host machine and listed the running Docker 
containers. The task wasn't running anymore, even though Airflow still thought 
it was.

{quote}
The reload task actually also fails because of
```
{models.py:1140} INFO - Dependencies not met for , dependency 'Task 
Instance Not Already Running' FAILED: Task is already running, it started on 
2017-04-20 11:19:59.597425.
```
so it never actually gets run. The original continues to run in our case.
{quote}

By "in our case", it means you tried the example DAG I wrote in the issue? If 
so, with which Airflow version (or commit hash)? Just by the log message, it 
seems like it tried running the task again, but failed because it was already 
running. It might be the fcase at the time, as I wasn't monitoring closely when 
the task stopped running, but it stopped nonetheless.

I'm not sure how/where to debug this issue further. Any ideas?

> DockerOperator jobs time out and get stuck in "running" forever
> ---
>
> Key: AIRFLOW-1131
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1131
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
>Affects Versions: 1.9.0
> Environment: Python 2.7.12
> git+git://github.com/apache/incubator-airflow.git@35e43f5067f4741640278b765c0e54e4fd45ffa3#egg=airflow[async,password,celery,crypto,postgres,hive,hdfs,jdbc]
>Reporter: Vitor Baptista
>
> With the following DAG and task:
> {code}
> import os
> from datetime import datetime, timedelta
> from airflow.models import DAG
> from airflow.operators.docker_operator import DockerOperator
> default_args = {
> 'owner': 'airflow',
> 'depends_on_past': False,
> 'start_date': datetime(2017, 1, 1),
> 'retries': 3,
> 'retry_delay': timedelta(minutes=10),
> }
> dag = DAG(
> dag_id='smoke_test',
> default_args=default_args,
> max_active_runs=1,
> schedule_interval='@daily'
> )
> sleep_forever_task = DockerOperator(
> task_id='sleep_forever',
> dag=dag,
> image='alpine:latest',
> api_version=os.environ.get('DOCKER_API_VERSION', '1.23'),
> command='sleep {}'.format(60 * 60 * 24),
> )
> {code}
> When I run it, this is what I get:
> {code}
> *** Log file isn't local.
> *** Fetching here: 
> http://589ea17432ec:8793/log/smoke_test/sleep_forever/2017-04-18T00:00:00
> [2017-04-20 11:19:58,258] {models.py:172} INFO - Filling up the DagBag from 
> /usr/local/airflow/dags/smoke_test.py
> [2017-04-20 11:19:58,438] {base_task_runner.py:112} INFO - Running: ['bash', 
> '-c', u'airflow run smoke_test sleep_forever 2017-04-18T00:00:00 --job_id 
> 2537 --raw -sd DAGS_FOLDER/smoke_test.py']
> [2017-04-20 11:19:58,888] {base_task_runner.py:95} INFO - Subtask: 
> /usr/local/airflow/src/airflow/airflow/configuration.py:128: 
> DeprecationWarning: This method will be removed in future versions.  Use 
> 'parser.read_file()' instead.
> [2017-04-20 11:19:58,888] {base_task_runner.py:95} INFO - Subtask:   
> self.readfp(StringIO.StringIO(string))
> [2017-04-20 11:19:59,214] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,214] {__init__.py:56} INFO - Using executor 
> CeleryExecutor
> [2017-04-20 11:19:59,227] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,227] {driver.py:120} INFO - Generating grammar tables 
> from /usr/lib/python2.7/lib2to3/Grammar.txt
> [2017-04-20 11:19:59,244] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,244] {driver.py:120} INFO - Generating grammar tables 
> from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
> [2017-04-20 11:19:59,377] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,377] {models.py:172} INFO - Filling up the DagBag from 
> /usr/local/airflow/dags/smoke_test.py
> [2017-04-20 11:19:59,597] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,597] {models.py:1146} INFO - Dependencies all met for 
> 
> [2017-04-20 11:19:59,605] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,605] {models.py:1146} INFO - Dependencies all met for 
> 
> [2017-04-20 11:19:59,605] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,605] {models.py:1338} INFO - 
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask: 
> 
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask: Starting 
> attempt 1 of 4
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask: 
> 
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} 

[jira] [Commented] (AIRFLOW-1131) DockerOperator jobs time out and get stuck in "running" forever

2017-04-20 Thread Vitor Baptista (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976982#comment-15976982
 ] 

Vitor Baptista commented on AIRFLOW-1131:
-

ping [~saguziel], @bolkedebruin on IRC said you might be interested in this 
issue?

> DockerOperator jobs time out and get stuck in "running" forever
> ---
>
> Key: AIRFLOW-1131
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1131
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
>Affects Versions: 1.9.0
> Environment: Python 2.7.12
> git+git://github.com/apache/incubator-airflow.git@35e43f5067f4741640278b765c0e54e4fd45ffa3#egg=airflow[async,password,celery,crypto,postgres,hive,hdfs,jdbc]
>Reporter: Vitor Baptista
>
> With the following DAG and task:
> {code}
> import os
> from datetime import datetime, timedelta
> from airflow.models import DAG
> from airflow.operators.docker_operator import DockerOperator
> default_args = {
> 'owner': 'airflow',
> 'depends_on_past': False,
> 'start_date': datetime(2017, 1, 1),
> 'retries': 3,
> 'retry_delay': timedelta(minutes=10),
> }
> dag = DAG(
> dag_id='smoke_test',
> default_args=default_args,
> max_active_runs=1,
> schedule_interval='@daily'
> )
> sleep_forever_task = DockerOperator(
> task_id='sleep_forever',
> dag=dag,
> image='alpine:latest',
> api_version=os.environ.get('DOCKER_API_VERSION', '1.23'),
> command='sleep {}'.format(60 * 60 * 24),
> )
> {code}
> When I run it, this is what I get:
> {code}
> *** Log file isn't local.
> *** Fetching here: 
> http://589ea17432ec:8793/log/smoke_test/sleep_forever/2017-04-18T00:00:00
> [2017-04-20 11:19:58,258] {models.py:172} INFO - Filling up the DagBag from 
> /usr/local/airflow/dags/smoke_test.py
> [2017-04-20 11:19:58,438] {base_task_runner.py:112} INFO - Running: ['bash', 
> '-c', u'airflow run smoke_test sleep_forever 2017-04-18T00:00:00 --job_id 
> 2537 --raw -sd DAGS_FOLDER/smoke_test.py']
> [2017-04-20 11:19:58,888] {base_task_runner.py:95} INFO - Subtask: 
> /usr/local/airflow/src/airflow/airflow/configuration.py:128: 
> DeprecationWarning: This method will be removed in future versions.  Use 
> 'parser.read_file()' instead.
> [2017-04-20 11:19:58,888] {base_task_runner.py:95} INFO - Subtask:   
> self.readfp(StringIO.StringIO(string))
> [2017-04-20 11:19:59,214] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,214] {__init__.py:56} INFO - Using executor 
> CeleryExecutor
> [2017-04-20 11:19:59,227] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,227] {driver.py:120} INFO - Generating grammar tables 
> from /usr/lib/python2.7/lib2to3/Grammar.txt
> [2017-04-20 11:19:59,244] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,244] {driver.py:120} INFO - Generating grammar tables 
> from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
> [2017-04-20 11:19:59,377] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,377] {models.py:172} INFO - Filling up the DagBag from 
> /usr/local/airflow/dags/smoke_test.py
> [2017-04-20 11:19:59,597] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,597] {models.py:1146} INFO - Dependencies all met for 
> 
> [2017-04-20 11:19:59,605] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,605] {models.py:1146} INFO - Dependencies all met for 
> 
> [2017-04-20 11:19:59,605] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,605] {models.py:1338} INFO - 
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask: 
> 
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask: Starting 
> attempt 1 of 4
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask: 
> 
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,620] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,620] {models.py:1362} INFO - Executing 
>  on 2017-04-18 00:00:00
> [2017-04-20 11:19:59,662] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,661] {docker_operator.py:132} INFO - Starting docker 
> container from image alpine:latest
> [2017-04-20 12:21:25,661] {models.py:172} INFO - Filling up the DagBag from 
> /usr/local/airflow/dags/smoke_test.py
> [2017-04-20 12:21:25,809] {base_task_runner.py:112} INFO - Running: ['bash', 
> '-c', u'airflow run smoke_test sleep_forever 2017-04-18T00:00:00 --job_id 
> 2574 --raw -sd DAGS_FOLDER/smoke_test.py']
> [2017-04-20 12:21:26,117] {base_task_runner.py:95} INFO - Subtask: 
> 

[jira] [Updated] (AIRFLOW-1131) DockerOperator jobs time out and get stuck in "running" forever

2017-04-20 Thread Vitor Baptista (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vitor Baptista updated AIRFLOW-1131:

Environment: 
Python 2.7.12

git+git://github.com/apache/incubator-airflow.git@35e43f5067f4741640278b765c0e54e4fd45ffa3#egg=airflow[async,password,celery,crypto,postgres,hive,hdfs,jdbc]

  was:
Python 2.7.12
git+git://github.com/apache/incubator-airflow.git@35e43f5067f4741640278b765c0e54e4fd45ffa3#egg=airflow[async,password,celery,crypto,postgres,hive,hdfs,jdbc]


> DockerOperator jobs time out and get stuck in "running" forever
> ---
>
> Key: AIRFLOW-1131
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1131
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
>Affects Versions: 1.9.0
> Environment: Python 2.7.12
> git+git://github.com/apache/incubator-airflow.git@35e43f5067f4741640278b765c0e54e4fd45ffa3#egg=airflow[async,password,celery,crypto,postgres,hive,hdfs,jdbc]
>Reporter: Vitor Baptista
>
> With the following DAG and task:
> {code}
> import os
> from datetime import datetime, timedelta
> from airflow.models import DAG
> from airflow.operators.docker_operator import DockerOperator
> default_args = {
> 'owner': 'airflow',
> 'depends_on_past': False,
> 'start_date': datetime(2017, 1, 1),
> 'retries': 3,
> 'retry_delay': timedelta(minutes=10),
> }
> dag = DAG(
> dag_id='smoke_test',
> default_args=default_args,
> max_active_runs=1,
> schedule_interval='@daily'
> )
> sleep_forever_task = DockerOperator(
> task_id='sleep_forever',
> dag=dag,
> image='alpine:latest',
> api_version=os.environ.get('DOCKER_API_VERSION', '1.23'),
> command='sleep {}'.format(60 * 60 * 24),
> )
> {code}
> When I run it, this is what I get:
> {code}
> *** Log file isn't local.
> *** Fetching here: 
> http://589ea17432ec:8793/log/smoke_test/sleep_forever/2017-04-18T00:00:00
> [2017-04-20 11:19:58,258] {models.py:172} INFO - Filling up the DagBag from 
> /usr/local/airflow/dags/smoke_test.py
> [2017-04-20 11:19:58,438] {base_task_runner.py:112} INFO - Running: ['bash', 
> '-c', u'airflow run smoke_test sleep_forever 2017-04-18T00:00:00 --job_id 
> 2537 --raw -sd DAGS_FOLDER/smoke_test.py']
> [2017-04-20 11:19:58,888] {base_task_runner.py:95} INFO - Subtask: 
> /usr/local/airflow/src/airflow/airflow/configuration.py:128: 
> DeprecationWarning: This method will be removed in future versions.  Use 
> 'parser.read_file()' instead.
> [2017-04-20 11:19:58,888] {base_task_runner.py:95} INFO - Subtask:   
> self.readfp(StringIO.StringIO(string))
> [2017-04-20 11:19:59,214] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,214] {__init__.py:56} INFO - Using executor 
> CeleryExecutor
> [2017-04-20 11:19:59,227] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,227] {driver.py:120} INFO - Generating grammar tables 
> from /usr/lib/python2.7/lib2to3/Grammar.txt
> [2017-04-20 11:19:59,244] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,244] {driver.py:120} INFO - Generating grammar tables 
> from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
> [2017-04-20 11:19:59,377] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,377] {models.py:172} INFO - Filling up the DagBag from 
> /usr/local/airflow/dags/smoke_test.py
> [2017-04-20 11:19:59,597] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,597] {models.py:1146} INFO - Dependencies all met for 
> 
> [2017-04-20 11:19:59,605] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,605] {models.py:1146} INFO - Dependencies all met for 
> 
> [2017-04-20 11:19:59,605] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,605] {models.py:1338} INFO - 
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask: 
> 
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask: Starting 
> attempt 1 of 4
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask: 
> 
> [2017-04-20 11:19:59,606] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,620] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,620] {models.py:1362} INFO - Executing 
>  on 2017-04-18 00:00:00
> [2017-04-20 11:19:59,662] {base_task_runner.py:95} INFO - Subtask: 
> [2017-04-20 11:19:59,661] {docker_operator.py:132} INFO - Starting docker 
> container from image alpine:latest
> [2017-04-20 12:21:25,661] {models.py:172} INFO - Filling up the DagBag from 
> /usr/local/airflow/dags/smoke_test.py
> [2017-04-20 12:21:25,809] {base_task_runner.py:112} INFO - Running: ['bash', 

[jira] [Created] (AIRFLOW-1088) DagBag's import_errors() contain quoted string messages

2017-04-07 Thread Vitor Baptista (JIRA)
Vitor Baptista created AIRFLOW-1088:
---

 Summary: DagBag's import_errors() contain quoted string messages
 Key: AIRFLOW-1088
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1088
 Project: Apache Airflow
  Issue Type: Bug
Affects Versions: 1.8.0
 Environment: Python 2.7, 3.5, 3.6
Reporter: Vitor Baptista
 Fix For: Airflow 1.7.1.3


For example, consider that we're loading a DAG folder that has a DAG requiring 
Variable {{FOO}} that doesn't exist, as follows:

{code}
dagbag = airflow.models.DagBag(dag_folder=dag_folder, include_examples=False)
dagbag.import_errors.values()
# ["u'Variable FOO does not exist"]
{code}

Notice that the error message from {{dagbag.import_errors.values()}} contain is 
inside {{u'...'}}. The problem appears to be that we're converting the 
{{KeyError}} messages using {{str(exception)}} (see lines 272 and 305 on 
https://github.com/apache/incubator-airflow/blob/53ca5084561fd5c13996609f2eda6baf717249b5/airflow/models.py).

We can reproduce a similar error using:
{code}
message = 'error message'
error = KeyError(message)
assert message == str(error), '"{}" != "{}"'.format(message, str(error))
# Traceback (most recent call last):
#   File "", line 1, in 
# AssertionError: "error message" != "'error message'"
{code}

I tested it on Python 2.7, 3.5 and 3.6 and got the same error, so my guess is 
that it affects Python 2.7+.

This problem doesn't occur with Airflow 1.7.1.3, although we use the same 
{{str(exception)}}, so my guess is that there's something different in how we 
throw the exception.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (AIRFLOW-1063) A manually-created DAG run can prevent a scheduled run to be created

2017-04-03 Thread Vitor Baptista (JIRA)
Vitor Baptista created AIRFLOW-1063:
---

 Summary: A manually-created DAG run can prevent a scheduled run to 
be created
 Key: AIRFLOW-1063
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1063
 Project: Apache Airflow
  Issue Type: Improvement
  Components: scheduler
Affects Versions: Airflow 1.7.1.3
Reporter: Vitor Baptista


I manually created a DAG Run with the {{execution_date}} as {{2017-03-01 
00:00:00}} on a monthly-recurrent DAG. After a while, I noticed that the 
scheduled run was never created and checked the scheduler's logs, finding this 
traceback:

{quote}
Process Process-475397:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 664, in 
_do_dags
dag = dagbag.get_dag(dag.dag_id)
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 188, in 
get_dag
orm_dag = DagModel.get_current(root_dag_id)
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2320, 
in get_current
obj = session.query(cls).filter(cls.dag_id == dag_id).first()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 
2690, in first
ret = list(self[0:1])
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 
2482, in __getitem__
return list(res)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 
2790, in __iter__
return self._execute_and_instances(context)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 
2811, in _execute_and_instances
close_with_result=True)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 
2820, in _get_bind_args
**kw
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 
2802, in _connection_from_session
conn = self.session.connection(**kw)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 
966, in connection
execution_options=execution_options)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 
971, in _connection_for_bind
engine, execution_options)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 
382, in _connection_for_bind
self._assert_active()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 
276, in _assert_active
% self._rollback_exception
InvalidRequestError: This Session's transaction has been rolled back due to a 
previous exception during flush. To begin a new transaction with this Session, 
first issue Session.rollback(). Original exception was: 
(psycopg2.IntegrityError)
 duplicate key value violates unique constraint 
"dag_run_dag_id_execution_date_key"
DETAIL:  Key (dag_id, execution_date)=(nct, 2017-03-01 00:00:00) already exists.
 [SQL: 'INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, 
state, run_id, external_trigger, conf) VALUES (%(dag_id)s, %(execution_date)s, 
%(start_date)s, %(end_date)s, %(state)s, %(run_id)s, %(external_trigger)s, 
%(conf)s)
 RETURNING dag_run.id'] [parameters: {'end_date': None, 'run_id': 
u'scheduled__2017-03-01T00:00:00', 'execution_date': datetime.datetime(2017, 3, 
1, 0, 0), 'external_trigger': False, 'state': u'running', 'conf': None, 
'start_date': dateti
me.datetime(2017, 4, 3, 13, 48, 39, 168456), 'dag_id': 'nct'}]
{quote}

The problem is that the {{dag_runs}} table require the {{(dag_id, 
execution_date)}} pair to be unique, so the scheduler was stuck in a loop where 
it tried creating a new scheduled dag run but failed, as I had already created 
one on the same {{execution_date}}. This was surprising. As a user, I would 
expect that it would either schedule the run normally, even if there's a manual 
one on the same date, or maybe it would skip that execution date.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (AIRFLOW-990) DockerOperator fails when logging unicode string

2017-03-15 Thread Vitor Baptista (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vitor Baptista reassigned AIRFLOW-990:
--

Assignee: Vitor Baptista

> DockerOperator fails when logging unicode string
> 
>
> Key: AIRFLOW-990
> URL: https://issues.apache.org/jira/browse/AIRFLOW-990
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: docker
>Affects Versions: Airflow 1.7.1
> Environment: Python 2.7
>Reporter: Vitor Baptista
>Assignee: Vitor Baptista
>
> On line 
> https://github.com/apache/incubator-airflow/blob/1.7.1.3/airflow/operators/docker_operator.py#L164,
>  we're calling:
> {code:title=airflow/operators/docker_operator.py}
> for line in self.cli.logs(container=self.container['Id'], stream=True):
> logging.info("{}".format(line.strip()))
> {code}
> If `self.cli.logs()` return a string with a unicode character, this raises 
> the UnicodeDecodeError:
> {noformat}
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/logging/__init__.py", line 861, in emit
> msg = self.format(record)
>   File "/usr/lib/python2.7/logging/__init__.py", line 734, in format
> return fmt.format(record)
>   File "/usr/lib/python2.7/logging/__init__.py", line 476, in format
> raise e
> UnicodeDecodeError: 'ascii' codec can't decode byte 0xf0 in position 0: 
> ordinal not in range(128)
> Logged from file docker_operator.py, line 165
> {noformat}
> A possible fix is to change that line to:
> {code:title=airflow/operators/docker_operator.py}
> for line in self.cli.logs(container=self.container['Id'], stream=True):
> logging.info(line.decode('utf-8').strip())
> {code}.
> This error doesn't happen on Python3. I haven't tested, but reading the code 
> it seems the same error exists on `master` as well.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-990) DockerOperator fails when logging unicode string

2017-03-15 Thread Vitor Baptista (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927126#comment-15927126
 ] 

Vitor Baptista commented on AIRFLOW-990:


Pull request for this issue sent on 
https://github.com/apache/incubator-airflow/pull/2155

> DockerOperator fails when logging unicode string
> 
>
> Key: AIRFLOW-990
> URL: https://issues.apache.org/jira/browse/AIRFLOW-990
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: docker
>Affects Versions: Airflow 1.7.1
> Environment: Python 2.7
>Reporter: Vitor Baptista
>
> On line 
> https://github.com/apache/incubator-airflow/blob/1.7.1.3/airflow/operators/docker_operator.py#L164,
>  we're calling:
> {code:title=airflow/operators/docker_operator.py}
> for line in self.cli.logs(container=self.container['Id'], stream=True):
> logging.info("{}".format(line.strip()))
> {code}
> If `self.cli.logs()` return a string with a unicode character, this raises 
> the UnicodeDecodeError:
> {noformat}
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/logging/__init__.py", line 861, in emit
> msg = self.format(record)
>   File "/usr/lib/python2.7/logging/__init__.py", line 734, in format
> return fmt.format(record)
>   File "/usr/lib/python2.7/logging/__init__.py", line 476, in format
> raise e
> UnicodeDecodeError: 'ascii' codec can't decode byte 0xf0 in position 0: 
> ordinal not in range(128)
> Logged from file docker_operator.py, line 165
> {noformat}
> A possible fix is to change that line to:
> {code:title=airflow/operators/docker_operator.py}
> for line in self.cli.logs(container=self.container['Id'], stream=True):
> logging.info(line.decode('utf-8').strip())
> {code}.
> This error doesn't happen on Python3. I haven't tested, but reading the code 
> it seems the same error exists on `master` as well.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (AIRFLOW-990) DockerOperator fails when logging unicode string

2017-03-15 Thread Vitor Baptista (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vitor Baptista updated AIRFLOW-990:
---
Description: 
On line 
https://github.com/apache/incubator-airflow/blob/1.7.1.3/airflow/operators/docker_operator.py#L164,
 we're calling:

{code:title=airflow/operators/docker_operator.py}
for line in self.cli.logs(container=self.container['Id'], stream=True):
logging.info("{}".format(line.strip()))
{code}

If `self.cli.logs()` return a string with a unicode character, this raises the 
UnicodeDecodeError:
{noformat}
Traceback (most recent call last):
  File "/usr/lib/python2.7/logging/__init__.py", line 861, in emit
msg = self.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 734, in format
return fmt.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 476, in format
raise e
UnicodeDecodeError: 'ascii' codec can't decode byte 0xf0 in position 0: ordinal 
not in range(128)
Logged from file docker_operator.py, line 165
{noformat}

A possible fix is to change that line to:
{code:title=airflow/operators/docker_operator.py}
for line in self.cli.logs(container=self.container['Id'], stream=True):
logging.info(line.decode('utf-8').strip())
{code}.

This error doesn't happen on Python3. I haven't tested, but reading the code it 
seems the same error exists on `master` as well.

  was:
On line 
https://github.com/apache/incubator-airflow/blob/1.7.1.3/airflow/operators/docker_operator.py#L164,
 we're calling:

{code:title=airflow/operators/docker_operator.py}
for line in self.cli.logs(container=self.container['Id'], stream=True):
logging.info("{}".format(line.strip()))
{code}

If `self.cli.logs()` return a string with a unicode character, this raises the 
UnicodeDecodeError:
{noformat}
Traceback (most recent call last):
  File "/usr/lib/python2.7/logging/__init__.py", line 861, in emit
msg = self.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 734, in format
return fmt.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 476, in format
raise e
UnicodeDecodeError: 'ascii' codec can't decode byte 0xf0 in position 0: ordinal 
not in range(128)
Logged from file docker_operator.py, line 165
{noformat}

A possible fix is to change that line to:
{code:python}
for line in self.cli.logs(container=self.container['Id'], stream=True):
logging.info(line.decode('utf-8').strip())
{code}.

This error doesn't happen on Python3. I haven't tested, but reading the code it 
seems the same error exists on `master` as well.


> DockerOperator fails when logging unicode string
> 
>
> Key: AIRFLOW-990
> URL: https://issues.apache.org/jira/browse/AIRFLOW-990
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: docker
>Affects Versions: Airflow 1.7.1
> Environment: Python 2.7
>Reporter: Vitor Baptista
>
> On line 
> https://github.com/apache/incubator-airflow/blob/1.7.1.3/airflow/operators/docker_operator.py#L164,
>  we're calling:
> {code:title=airflow/operators/docker_operator.py}
> for line in self.cli.logs(container=self.container['Id'], stream=True):
> logging.info("{}".format(line.strip()))
> {code}
> If `self.cli.logs()` return a string with a unicode character, this raises 
> the UnicodeDecodeError:
> {noformat}
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/logging/__init__.py", line 861, in emit
> msg = self.format(record)
>   File "/usr/lib/python2.7/logging/__init__.py", line 734, in format
> return fmt.format(record)
>   File "/usr/lib/python2.7/logging/__init__.py", line 476, in format
> raise e
> UnicodeDecodeError: 'ascii' codec can't decode byte 0xf0 in position 0: 
> ordinal not in range(128)
> Logged from file docker_operator.py, line 165
> {noformat}
> A possible fix is to change that line to:
> {code:title=airflow/operators/docker_operator.py}
> for line in self.cli.logs(container=self.container['Id'], stream=True):
> logging.info(line.decode('utf-8').strip())
> {code}.
> This error doesn't happen on Python3. I haven't tested, but reading the code 
> it seems the same error exists on `master` as well.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (AIRFLOW-990) DockerOperator fails when logging unicode string

2017-03-15 Thread Vitor Baptista (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vitor Baptista updated AIRFLOW-990:
---
Description: 
On line 
https://github.com/apache/incubator-airflow/blob/1.7.1.3/airflow/operators/docker_operator.py#L164,
 we're calling:

{code:title=airflow/operators/docker_operator.py}
for line in self.cli.logs(container=self.container['Id'], stream=True):
logging.info("{}".format(line.strip()))
{code}

If `self.cli.logs()` return a string with a unicode character, this raises the 
UnicodeDecodeError:
{noformat}
Traceback (most recent call last):
  File "/usr/lib/python2.7/logging/__init__.py", line 861, in emit
msg = self.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 734, in format
return fmt.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 476, in format
raise e
UnicodeDecodeError: 'ascii' codec can't decode byte 0xf0 in position 0: ordinal 
not in range(128)
Logged from file docker_operator.py, line 165
{noformat}

A possible fix is to change that line to:
{code:python}
for line in self.cli.logs(container=self.container['Id'], stream=True):
logging.info(line.decode('utf-8').strip())
{code}.

This error doesn't happen on Python3. I haven't tested, but reading the code it 
seems the same error exists on `master` as well.

  was:
On line 
https://github.com/apache/incubator-airflow/blob/1.7.1.3/airflow/operators/docker_operator.py#L164,
 we're calling:

{code:title=airflow/operators/docker_operator.py}
for line in self.cli.logs(container=self.container['Id'], stream=True):
logging.info("{}".format(line.strip()))
{code}

If `self.cli.logs()` return a string with a unicode character, this raises the 
UnicodeDecodeError:
{preformat}
Traceback (most recent call last):
  File "/usr/lib/python2.7/logging/__init__.py", line 861, in emit
msg = self.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 734, in format
return fmt.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 476, in format
raise e
UnicodeDecodeError: 'ascii' codec can't decode byte 0xf0 in position 0: ordinal 
not in range(128)
Logged from file docker_operator.py, line 165
{preformat}

A possible fix is to change that line to 
{code}logging.info(line.decode('utf-8').strip()){code}.

This error doesn't happen on Python3. I haven't tested, but reading the code it 
seems the same error exists on `master` as well.


> DockerOperator fails when logging unicode string
> 
>
> Key: AIRFLOW-990
> URL: https://issues.apache.org/jira/browse/AIRFLOW-990
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: docker
>Affects Versions: Airflow 1.7.1
> Environment: Python 2.7
>Reporter: Vitor Baptista
>
> On line 
> https://github.com/apache/incubator-airflow/blob/1.7.1.3/airflow/operators/docker_operator.py#L164,
>  we're calling:
> {code:title=airflow/operators/docker_operator.py}
> for line in self.cli.logs(container=self.container['Id'], stream=True):
> logging.info("{}".format(line.strip()))
> {code}
> If `self.cli.logs()` return a string with a unicode character, this raises 
> the UnicodeDecodeError:
> {noformat}
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/logging/__init__.py", line 861, in emit
> msg = self.format(record)
>   File "/usr/lib/python2.7/logging/__init__.py", line 734, in format
> return fmt.format(record)
>   File "/usr/lib/python2.7/logging/__init__.py", line 476, in format
> raise e
> UnicodeDecodeError: 'ascii' codec can't decode byte 0xf0 in position 0: 
> ordinal not in range(128)
> Logged from file docker_operator.py, line 165
> {noformat}
> A possible fix is to change that line to:
> {code:python}
> for line in self.cli.logs(container=self.container['Id'], stream=True):
> logging.info(line.decode('utf-8').strip())
> {code}.
> This error doesn't happen on Python3. I haven't tested, but reading the code 
> it seems the same error exists on `master` as well.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (AIRFLOW-990) DockerOperator fails when logging unicode string

2017-03-15 Thread Vitor Baptista (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vitor Baptista updated AIRFLOW-990:
---
Description: 
On line 
https://github.com/apache/incubator-airflow/blob/1.7.1.3/airflow/operators/docker_operator.py#L164,
 we're calling:

{code:title=airflow/operators/docker_operator.py}
for line in self.cli.logs(container=self.container['Id'], stream=True):
logging.info("{}".format(line.strip()))
{code}

If `self.cli.logs()` return a string with a unicode character, this raises the 
UnicodeDecodeError:
{preformat}
Traceback (most recent call last):
  File "/usr/lib/python2.7/logging/__init__.py", line 861, in emit
msg = self.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 734, in format
return fmt.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 476, in format
raise e
UnicodeDecodeError: 'ascii' codec can't decode byte 0xf0 in position 0: ordinal 
not in range(128)
Logged from file docker_operator.py, line 165
{preformat}

A possible fix is to change that line to 
{code}logging.info(line.decode('utf-8').strip()){code}.

This error doesn't happen on Python3. I haven't tested, but reading the code it 
seems the same error exists on `master` as well.

  was:
On line 
https://github.com/apache/incubator-airflow/blob/1.7.1.3/airflow/operators/docker_operator.py#L164,
 we're calling:

```
for line in self.cli.logs(container=self.container['Id'], stream=True):
logging.info("{}".format(line.strip()))
```

If `self.cli.logs()` return a string with a unicode character, this raises the 
UnicodeDecodeError:
```
Traceback (most recent call last):
  File "/usr/lib/python2.7/logging/__init__.py", line 861, in emit
msg = self.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 734, in format
return fmt.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 476, in format
raise e
UnicodeDecodeError: 'ascii' codec can't decode byte 0xf0 in position 0: ordinal 
not in range(128)
Logged from file docker_operator.py, line 165
```

A possible fix is to change that line to 
`logging.info(line.decode('utf-8').strip())`.

This error doesn't happen on Python3. I haven't tested, but reading the code it 
seems the same error exists on `master` as well.


> DockerOperator fails when logging unicode string
> 
>
> Key: AIRFLOW-990
> URL: https://issues.apache.org/jira/browse/AIRFLOW-990
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: docker
>Affects Versions: Airflow 1.7.1
> Environment: Python 2.7
>Reporter: Vitor Baptista
>
> On line 
> https://github.com/apache/incubator-airflow/blob/1.7.1.3/airflow/operators/docker_operator.py#L164,
>  we're calling:
> {code:title=airflow/operators/docker_operator.py}
> for line in self.cli.logs(container=self.container['Id'], stream=True):
> logging.info("{}".format(line.strip()))
> {code}
> If `self.cli.logs()` return a string with a unicode character, this raises 
> the UnicodeDecodeError:
> {preformat}
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/logging/__init__.py", line 861, in emit
> msg = self.format(record)
>   File "/usr/lib/python2.7/logging/__init__.py", line 734, in format
> return fmt.format(record)
>   File "/usr/lib/python2.7/logging/__init__.py", line 476, in format
> raise e
> UnicodeDecodeError: 'ascii' codec can't decode byte 0xf0 in position 0: 
> ordinal not in range(128)
> Logged from file docker_operator.py, line 165
> {preformat}
> A possible fix is to change that line to 
> {code}logging.info(line.decode('utf-8').strip()){code}.
> This error doesn't happen on Python3. I haven't tested, but reading the code 
> it seems the same error exists on `master` as well.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-115) Migrate and Refactor AWS integration to use boto3 and better structured hooks

2017-02-10 Thread Vitor Baptista (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15861245#comment-15861245
 ] 

Vitor Baptista commented on AIRFLOW-115:


[~artwr] Are you working on this issue? I'm facing many errors when using the 
current `S3Hook` mostly caused by it using `boto` instead of `boto3` (e.g. 
https://github.com/boto/boto/issues/2836)

> Migrate and Refactor AWS integration to use boto3 and better structured hooks
> -
>
> Key: AIRFLOW-115
> URL: https://issues.apache.org/jira/browse/AIRFLOW-115
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: aws, boto3, hooks
>Reporter: Arthur Wiedmer
>Assignee: Arthur Wiedmer
>Priority: Minor
>
> h2. Current State
> The current AWS integration is mostly done through the S3Hook, which uses non 
> standard credentials parsing on top of using boto instead of boto3 which is 
> the current supported AWS sdk for Python.
> h2. Proposal
> an AWSHook should be provided that maps Airflow connections to the boto3 API. 
> Operators working with s3, as well as other AWS services would then inherit 
> from this hook but extend the functionality with service specific methods 
> like get_key for S3, start_cluster for EMR, enqueue for SQS, send_email for 
> SES etc...
> * AWSHook
> ** S3Hook
> ** EMRHook
> ** SQSHook
> ** SESHook
> ...
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (AIRFLOW-854) Add OpenTrials to Airflow users

2017-02-09 Thread Vitor Baptista (JIRA)
Vitor Baptista created AIRFLOW-854:
--

 Summary: Add OpenTrials to Airflow users
 Key: AIRFLOW-854
 URL: https://issues.apache.org/jira/browse/AIRFLOW-854
 Project: Apache Airflow
  Issue Type: Bug
Reporter: Vitor Baptista
Assignee: Vitor Baptista






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-565) DockerOperator doesn't work on Python3.4

2016-10-11 Thread Vitor Baptista (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15566109#comment-15566109
 ] 

Vitor Baptista commented on AIRFLOW-565:


Went ahead and sent a pull request for it on 
https://github.com/apache/incubator-airflow/pull/1832

> DockerOperator doesn't work on Python3.4
> 
>
> Key: AIRFLOW-565
> URL: https://issues.apache.org/jira/browse/AIRFLOW-565
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: docker, operators
>Affects Versions: Airflow 1.7.1.3
> Environment: Python 3.4.3
>Reporter: Vitor Baptista
>
> On {{DockerOperator.execute()}} we have:
> {code}
> if self.force_pull or len(self.cli.images(name=image)) == 0:
> logging.info('Pulling docker image ' + image)
> for l in self.cli.pull(image, stream=True):
> output = json.loads(l)
> logging.info("{}".format(output['status']))
> {code}
> https://github.com/apache/incubator-airflow/blob/master/airflow/operators/docker_operator.py#L152-L156
> The {{self.cli.pull()}} method returns {{bytes}} in Python3.4, and 
> {{json.loads()}} expects a string, so we end up with:
> {code}
> Traceback (most recent call last):
>   File 
> "/home/vitor/Projetos/okfn/opentrials/airflow/env/lib/python3.4/site-packages/airflow/models.py",
>  line 1245, in run
> result = task_copy.execute(context=context)
>   File 
> "/home/vitor/Projetos/okfn/opentrials/airflow/env/lib/python3.4/site-packages/airflow/operators/docker_operator.py",
>  line 142, in execute
> logging.info("{}".format(output['status']))
>   File "/usr/lib/python3.4/json/__init__.py", line 312, in loads
> s.__class__.__name__))
> TypeError: the JSON object must be str, not 'bytes'
> {code}
> To avoid this, we could simply change it to {{output = 
> json.loads(l.encode('utf-8'))}}. This hardcodes the string as UTF-8, which 
> should be fine, considering the JSON spec requires the use of UTF-8, UTF-16 
> or UTF-32. As we're dealing with a Docker server, we can assume they'll be 
> well behaved.
> I'm happy to submit a pull request for this if you agree with the solution.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (AIRFLOW-565) DockerOperator doesn't work on Python3.4

2016-10-11 Thread Vitor Baptista (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vitor Baptista reassigned AIRFLOW-565:
--

Assignee: Vitor Baptista

> DockerOperator doesn't work on Python3.4
> 
>
> Key: AIRFLOW-565
> URL: https://issues.apache.org/jira/browse/AIRFLOW-565
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: docker, operators
>Affects Versions: Airflow 1.7.1.3
> Environment: Python 3.4.3
>Reporter: Vitor Baptista
>Assignee: Vitor Baptista
>
> On {{DockerOperator.execute()}} we have:
> {code}
> if self.force_pull or len(self.cli.images(name=image)) == 0:
> logging.info('Pulling docker image ' + image)
> for l in self.cli.pull(image, stream=True):
> output = json.loads(l)
> logging.info("{}".format(output['status']))
> {code}
> https://github.com/apache/incubator-airflow/blob/master/airflow/operators/docker_operator.py#L152-L156
> The {{self.cli.pull()}} method returns {{bytes}} in Python3.4, and 
> {{json.loads()}} expects a string, so we end up with:
> {code}
> Traceback (most recent call last):
>   File 
> "/home/vitor/Projetos/okfn/opentrials/airflow/env/lib/python3.4/site-packages/airflow/models.py",
>  line 1245, in run
> result = task_copy.execute(context=context)
>   File 
> "/home/vitor/Projetos/okfn/opentrials/airflow/env/lib/python3.4/site-packages/airflow/operators/docker_operator.py",
>  line 142, in execute
> logging.info("{}".format(output['status']))
>   File "/usr/lib/python3.4/json/__init__.py", line 312, in loads
> s.__class__.__name__))
> TypeError: the JSON object must be str, not 'bytes'
> {code}
> To avoid this, we could simply change it to {{output = 
> json.loads(l.encode('utf-8'))}}. This hardcodes the string as UTF-8, which 
> should be fine, considering the JSON spec requires the use of UTF-8, UTF-16 
> or UTF-32. As we're dealing with a Docker server, we can assume they'll be 
> well behaved.
> I'm happy to submit a pull request for this if you agree with the solution.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)