Repository: incubator-airflow Updated Branches: refs/heads/master 0be87d5c4 -> 51acc5087
[AIRFLOW-679] Stop concurrent task instances from running Check that PID remains unchanged, and throw exception otherwise. Testing Done: - Ran a task, set PID to be different, and ensuring it failed If there's a connection error while heartbeating, it should retry. Also, if it hasn't been able to heartbeat for a while, it should kill the child processes so that we don't have 2 of the same task running. Closes #1939 from saguziel/consistency Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/51acc508 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/51acc508 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/51acc508 Branch: refs/heads/master Commit: 51acc50875ee64af135caaf0583f51ef3868cda3 Parents: 0be87d5 Author: Alex Guziel <alex.guz...@airbnb.com> Authored: Fri Dec 16 13:28:02 2016 -0800 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Fri Dec 16 13:28:07 2016 -0800 ---------------------------------------------------------------------- airflow/jobs.py | 20 +++++++++-- ...e7d17757c7a_add_pid_field_to_taskinstance.py | 37 ++++++++++++++++++++ airflow/models.py | 1 + 3 files changed, 56 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/51acc508/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index a2d94e3..81c77a8 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -2005,6 +2005,13 @@ class LocalTaskJob(BaseJob): ) self.process = subprocess.Popen(['bash', '-c', command]) self.logger.info("Subprocess PID is {}".format(self.process.pid)) + ti = self.task_instance + session = settings.Session() + ti.pid = self.process.pid + ti.hostname = socket.getfqdn() + session.merge(ti) + session.commit() + session.close() last_heartbeat_time = time.time() heartbeat_time_limit = conf.getint('scheduler', @@ -2054,11 +2061,20 @@ class LocalTaskJob(BaseJob): # Suicide pill TI = models.TaskInstance ti = self.task_instance - state = session.query(TI.state).filter( + new_ti = session.query(TI).filter( TI.dag_id==ti.dag_id, TI.task_id==ti.task_id, TI.execution_date==ti.execution_date).scalar() - if state == State.RUNNING: + if new_ti.state == State.RUNNING: self.was_running = True + fqdn = socket.getfqdn() + if not (fqdn == new_ti.hostname and self.process.pid == new_ti.pid): + logging.warning("Recorded hostname and pid of {new_ti.hostname} " + "and {new_ti.pid} do not match this instance's " + "which are {fqdn} and " + "{self.process.pid}. Taking the poison pill. So " + "long." + .format(**locals())) + raise AirflowException("Another worker/process is running this job") elif self.was_running and hasattr(self, 'process'): logging.warning( "State of this instance has been externally set to " http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/51acc508/airflow/migrations/versions/5e7d17757c7a_add_pid_field_to_taskinstance.py ---------------------------------------------------------------------- diff --git a/airflow/migrations/versions/5e7d17757c7a_add_pid_field_to_taskinstance.py b/airflow/migrations/versions/5e7d17757c7a_add_pid_field_to_taskinstance.py new file mode 100644 index 0000000..7146864 --- /dev/null +++ b/airflow/migrations/versions/5e7d17757c7a_add_pid_field_to_taskinstance.py @@ -0,0 +1,37 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""add pid field to TaskInstance + +Revision ID: 5e7d17757c7a +Revises: 8504051e801b +Create Date: 2016-12-07 15:51:37.119478 + +""" + +# revision identifiers, used by Alembic. +revision = '5e7d17757c7a' +down_revision = '8504051e801b' +branch_labels = None +depends_on = None + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.add_column('task_instance', sa.Column('pid', sa.Integer)) + + +def downgrade(): + op.drop_column('task_instance', 'pid') http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/51acc508/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index f46a352..5d7075d 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -733,6 +733,7 @@ class TaskInstance(Base): priority_weight = Column(Integer) operator = Column(String(1000)) queued_dttm = Column(DateTime) + pid = Column(Integer) __table_args__ = ( Index('ti_dag_state', dag_id, state),