This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push: new adce6f0 Use Hash of Serialized DAG to determine DAG is changed or not (#10227) adce6f0 is described below commit adce6f029609e89f3651a89df40700589ec16237 Author: Kaxil Naik <kaxiln...@gmail.com> AuthorDate: Tue Aug 11 22:31:55 2020 +0100 Use Hash of Serialized DAG to determine DAG is changed or not (#10227) closes #10116 --- ...c3a5a_add_dag_hash_column_to_serialized_dag_.py | 46 ++++++++++++++++++++++ airflow/models/serialized_dag.py | 11 ++++-- tests/models/test_serialized_dag.py | 17 ++++---- 3 files changed, 62 insertions(+), 12 deletions(-) diff --git a/airflow/migrations/versions/da3f683c3a5a_add_dag_hash_column_to_serialized_dag_.py b/airflow/migrations/versions/da3f683c3a5a_add_dag_hash_column_to_serialized_dag_.py new file mode 100644 index 0000000..4dbc77a --- /dev/null +++ b/airflow/migrations/versions/da3f683c3a5a_add_dag_hash_column_to_serialized_dag_.py @@ -0,0 +1,46 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Add dag_hash Column to serialized_dag table + +Revision ID: da3f683c3a5a +Revises: 8d48763f6d53 +Create Date: 2020-08-07 20:52:09.178296 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = 'da3f683c3a5a' +down_revision = '8d48763f6d53' +branch_labels = None +depends_on = None + + +def upgrade(): + """Apply Add dag_hash Column to serialized_dag table""" + op.add_column( + 'serialized_dag', + sa.Column('dag_hash', sa.String(32), nullable=False, server_default='Hash not calculated yet')) + + +def downgrade(): + """Unapply Add dag_hash Column to serialized_dag table""" + op.drop_column('serialized_dag', 'dag_hash') diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py index 078724d..2368b9e 100644 --- a/airflow/models/serialized_dag.py +++ b/airflow/models/serialized_dag.py @@ -18,6 +18,7 @@ """Serialized DAG table in database.""" +import hashlib import logging from datetime import datetime, timedelta from typing import Any, Dict, List, Optional @@ -53,7 +54,7 @@ class SerializedDagModel(Base): interval of deleting serialized DAGs in DB when the files are deleted, suggest to use a smaller interval such as 60 - It is used by webserver to load dagbags when ``store_serialized_dags=True``. + It is used by webserver to load dags when ``store_serialized_dags=True``. Because reading from database is lightweight compared to importing from files, it solves the webserver scalability issue. """ @@ -65,6 +66,7 @@ class SerializedDagModel(Base): fileloc_hash = Column(BigInteger, nullable=False) data = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False) last_updated = Column(UtcDateTime, nullable=False) + dag_hash = Column(String(32), nullable=False) __table_args__ = ( Index('idx_fileloc_hash', fileloc_hash, unique=False), @@ -76,6 +78,7 @@ class SerializedDagModel(Base): self.fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc) self.data = SerializedDAG.to_dict(dag) self.last_updated = timezone.utcnow() + self.dag_hash = hashlib.md5(json.dumps(self.data, sort_keys=True).encode("utf-8")).hexdigest() def __repr__(self): return f"<SerializedDag: {self.dag_id}>" @@ -102,9 +105,11 @@ class SerializedDagModel(Base): return log.debug("Checking if DAG (%s) changed", dag.dag_id) - serialized_dag_from_db: SerializedDagModel = session.query(cls).get(dag.dag_id) new_serialized_dag = cls(dag) - if serialized_dag_from_db and (serialized_dag_from_db.data == new_serialized_dag.data): + serialized_dag_hash_from_db = session.query( + cls.dag_hash).filter(cls.dag_id == dag.dag_id).scalar() + + if serialized_dag_hash_from_db == new_serialized_dag.dag_hash: log.debug("Serialized DAG (%s) is unchanged. Skipping writing to DB", dag.dag_id) return diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py index 79e71a8..5e4e9cd 100644 --- a/tests/models/test_serialized_dag.py +++ b/tests/models/test_serialized_dag.py @@ -84,27 +84,26 @@ class SerializedDagModelTest(unittest.TestCase): SDM.write_dag(dag=example_bash_op_dag) with create_session() as session: - last_updated = session.query( - SDM.last_updated).filter(SDM.dag_id == example_bash_op_dag.dag_id).one_or_none() + s_dag = session.query(SDM).get(example_bash_op_dag.dag_id) # Test that if DAG is not changed, Serialized DAG is not re-written and last_updated # column is not updated SDM.write_dag(dag=example_bash_op_dag) - last_updated_1 = session.query( - SDM.last_updated).filter(SDM.dag_id == example_bash_op_dag.dag_id).one_or_none() + s_dag_1 = session.query(SDM).get(example_bash_op_dag.dag_id) - self.assertEqual(last_updated, last_updated_1) + self.assertEqual(s_dag_1.dag_hash, s_dag.dag_hash) + self.assertEqual(s_dag.last_updated, s_dag_1.last_updated) # Update DAG example_bash_op_dag.tags += ["new_tag"] self.assertCountEqual(example_bash_op_dag.tags, ["example", "new_tag"]) SDM.write_dag(dag=example_bash_op_dag) - new_s_dag = session.query(SDM.last_updated, SDM.data).filter( - SDM.dag_id == example_bash_op_dag.dag_id).one_or_none() + s_dag_2 = session.query(SDM).get(example_bash_op_dag.dag_id) - self.assertNotEqual(last_updated, new_s_dag.last_updated) - self.assertEqual(new_s_dag.data["dag"]["tags"], ["example", "new_tag"]) + self.assertNotEqual(s_dag.last_updated, s_dag_2.last_updated) + self.assertNotEqual(s_dag.dag_hash, s_dag_2.dag_hash) + self.assertEqual(s_dag_2.data["dag"]["tags"], ["example", "new_tag"]) def test_read_dags(self): """DAGs can be read from database."""