[ 
https://issues.apache.org/jira/browse/AIRFLOW-1157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fokko Driesprong resolved AIRFLOW-1157.
---------------------------------------
       Resolution: Fixed
    Fix Version/s: 2.0.0

Issue resolved by pull request #3002
[https://github.com/apache/incubator-airflow/pull/3002]

> Assigning a task to a pool that doesn't exist crashes the scheduler
> -------------------------------------------------------------------
>
>                 Key: AIRFLOW-1157
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1157
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: scheduler
>    Affects Versions: Airflow 1.8
>            Reporter: John Culver
>            Assignee: David Klosowski
>            Priority: Critical
>             Fix For: 2.0.0
>
>
> If a dag is run that contains a task using a pool that doesn't exist, the 
> scheduler will crash.
> Manually triggering the run of this dag on an environment without a pool 
> named 'a_non_existent_pool' will crash the scheduler:
> {code}
> from datetime import datetime
> from airflow.models import DAG
> from airflow.operators.dummy_operator import DummyOperator
> dag = DAG(dag_id='crash_scheduler',
>           start_date=datetime(2017,1,1),
>           schedule_interval=None)
> t1 = DummyOperator(task_id='crash',
>                    pool='a_non_existent_pool',
>                    dag=dag)
> {code}
> Here is the relevant log output on the scheduler:
> {noformat}
> [2017-04-27 19:31:24,816] {dag_processing.py:559} INFO - Processor for 
> /opt/airflow/dags/test-3.py finished
> [2017-04-27 19:31:24,817] {dag_processing.py:559} INFO - Processor for 
> /opt/airflow/dags/test_s3_file_move.py finished
> [2017-04-27 19:31:24,819] {dag_processing.py:627} INFO - Started a process 
> (PID: 124) to generate tasks for /opt/airflow/dags/crash_scheduler.py - 
> logging into /tmp/airflow/scheduler/logs/2017-04-27/crash_scheduler.py.log
> [2017-04-27 19:31:24,822] {dag_processing.py:627} INFO - Started a process 
> (PID: 125) to generate tasks for /opt/airflow/dags/configuration/constants.py 
> - logging into 
> /tmp/airflow/scheduler/logs/2017-04-27/configuration/constants.py.log
> [2017-04-27 19:31:24,847] {jobs.py:1007} INFO - Tasks up for execution:
>         <TaskInstance: move_s3_file_test.move_files 2017-04-27 
> 19:31:22.298893 [scheduled]>
> [2017-04-27 19:31:24,849] {jobs.py:1030} INFO - Figuring out tasks to run in 
> Pool(name=None) with 128 open slots and 1 task instances in queue
> [2017-04-27 19:31:24,856] {jobs.py:1078} INFO - DAG move_s3_file_test has 
> 0/16 running tasks
> [2017-04-27 19:31:24,856] {jobs.py:1105} INFO - Sending to executor 
> (u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31, 
> 22, 298893)) with priority 1 and queue MVSANDBOX-airflow-DEV-dev
> [2017-04-27 19:31:24,859] {jobs.py:1116} INFO - Setting state of 
> (u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31, 
> 22, 298893)) to queued
> [2017-04-27 19:31:24,867] {base_executor.py:50} INFO - Adding to queue: 
> airflow run move_s3_file_test move_files 2017-04-27T19:31:22.298893 --local 
> -sd /opt/airflow/dags/test_s3_file_move.py
> [2017-04-27 19:31:24,867] {jobs.py:1440} INFO - Heartbeating the executor
> [2017-04-27 19:31:24,872] {celery_executor.py:78} INFO - [celery] queuing 
> (u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31, 
> 22, 298893)) through celery, queue=MVSANDBOX-airflow-DEV-dev
> [2017-04-27 19:31:25,974] {jobs.py:1404} INFO - Heartbeating the process 
> manager
> [2017-04-27 19:31:25,975] {dag_processing.py:559} INFO - Processor for 
> /opt/airflow/dags/crash_scheduler.py finished
> [2017-04-27 19:31:25,975] {dag_processing.py:559} INFO - Processor for 
> /opt/airflow/dags/configuration/constants.py finished
> [2017-04-27 19:31:25,977] {dag_processing.py:627} INFO - Started a process 
> (PID: 128) to generate tasks for /opt/airflow/dags/example_s3_sensor.py - 
> logging into /tmp/airflow/scheduler/logs/2017-04-27/example_s3_sensor.py.log
> [2017-04-27 19:31:25,980] {dag_processing.py:627} INFO - Started a process 
> (PID: 129) to generate tasks for /opt/airflow/dags/test-4.py - logging into 
> /tmp/airflow/scheduler/logs/2017-04-27/test-4.py.log
> [2017-04-27 19:31:26,004] {jobs.py:1007} INFO - Tasks up for execution:
>         <TaskInstance: crash_scheduler.crash 2017-04-27 19:30:51.948542 
> [scheduled]>
> [2017-04-27 19:31:26,006] {jobs.py:1311} INFO - Exited execute loop
> [2017-04-27 19:31:26,008] {jobs.py:1325} INFO - Terminating child PID: 128
> [2017-04-27 19:31:26,008] {jobs.py:1325} INFO - Terminating child PID: 129
> [2017-04-27 19:31:26,008] {jobs.py:1329} INFO - Waiting up to 5s for 
> processes to exit...
> Traceback (most recent call last):
>   File "/usr/bin/airflow", line 28, in <module>
>     args.func(args)
>   File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 839, in 
> scheduler
>     job.run()
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 200, in run
>     self._execute()
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1309, in 
> _execute
>     self._execute_helper(processor_manager)
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1437, in 
> _execute_helper
>     (State.SCHEDULED,))
>   File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in 
> wrapper
>     result = func(*args, **kwargs)
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1025, in 
> _execute_task_instances
>     open_slots = pools[pool].open_slots(session=session)
> KeyError: u'a_non_existant_pool'
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to