Repository: incubator-airflow
Updated Branches:
  refs/heads/master fedc5a092 -> b1deb3318


[AIRFLOW-2065] Fix race-conditions when creating loggers

If two or more workers start at the same time,
they will execute
the same operations to create output directories
for storing the log
files. It can lead to race-conditions when, for
example,  worker A
create the directory right after the non-existance
check done by
worker B; worker B will also try to create the
directory while it does
already exist.

Closes #3040 from blinkseb/fix-airflow-2065


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b1deb331
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b1deb331
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b1deb331

Branch: refs/heads/master
Commit: b1deb3318f3fae4e21860bbd6e6463ef644aea8d
Parents: fedc5a0
Author: Sébastien Brochet <sebastien.broc...@holimetrix.com>
Authored: Fri Mar 2 12:38:03 2018 +0100
Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Committed: Fri Mar 2 12:38:03 2018 +0100

----------------------------------------------------------------------
 airflow/utils/log/file_processor_handler.py | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b1deb331/airflow/utils/log/file_processor_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/file_processor_handler.py 
b/airflow/utils/log/file_processor_handler.py
index 176e316..f6d8d93 100644
--- a/airflow/utils/log/file_processor_handler.py
+++ b/airflow/utils/log/file_processor_handler.py
@@ -12,6 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import errno
 import logging
 import os
 
@@ -45,7 +46,14 @@ class FileProcessorHandler(logging.Handler):
 
         self._cur_date = datetime.today()
         if not os.path.exists(self._get_log_directory()):
-            os.makedirs(self._get_log_directory())
+            try:
+                os.makedirs(self._get_log_directory())
+            except OSError as e:
+                # only ignore case where the directory already exist
+                if e.errno != errno.EEXIST:
+                    raise
+
+                logging.warning("%s already exists", self._get_log_directory())
 
         self._symlink_latest_log_directory()
 

Reply via email to