Repository: incubator-airflow
Updated Branches:
  refs/heads/master 3fa55db90 -> e95a1251b


[AIRFLOW-2240][DASK] Added TLS/SSL support for the dask-distributed scheduler.

As of 0.17.0 dask distributed has support for
TLS/SSL.

[dask] Added TLS/SSL support for the dask-
distributed scheduler.

As of 0.17.0 dask distributed has support for
TLS/SSL.

Add a test for tls under dask distributed

Closes #2683 from mariusvniekerk/dask-ssl


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e95a1251
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e95a1251
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e95a1251

Branch: refs/heads/master
Commit: e95a1251b746ac74a47b3ccae52d4abdc26add82
Parents: 3fa55db
Author: Marius van Niekerk <mvanniek...@flatiron.com>
Authored: Wed Apr 18 09:45:46 2018 -0700
Committer: Arthur Wiedmer <awied...@netflix.com>
Committed: Wed Apr 18 09:45:52 2018 -0700

----------------------------------------------------------------------
 airflow/config_templates/default_airflow.cfg |  4 ++
 airflow/executors/dask_executor.py           | 16 +++++-
 setup.py                                     |  2 +-
 tests/executors/dask_executor.py             | 61 +++++++++++++++++++----
 4 files changed, 72 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e95a1251/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index bb3793b..400bcc0 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -364,6 +364,10 @@ ssl_cacert =
 
 # The IP address and port of the Dask cluster's scheduler.
 cluster_address = 127.0.0.1:8786
+# TLS/ SSL settings to access a secured Dask scheduler.
+tls_ca =
+tls_cert =
+tls_key =
 
 
 [scheduler]

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e95a1251/airflow/executors/dask_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/dask_executor.py 
b/airflow/executors/dask_executor.py
index 0d914ba..17ace55 100644
--- a/airflow/executors/dask_executor.py
+++ b/airflow/executors/dask_executor.py
@@ -36,10 +36,24 @@ class DaskExecutor(BaseExecutor):
             raise ValueError(
                 'Please provide a Dask cluster address in airflow.cfg')
         self.cluster_address = cluster_address
+        # ssl / tls parameters
+        self.tls_ca = configuration.get('dask', 'tls_ca')
+        self.tls_key = configuration.get('dask', 'tls_key')
+        self.tls_cert = configuration.get('dask', 'tls_cert')
         super(DaskExecutor, self).__init__(parallelism=0)
 
     def start(self):
-        self.client = distributed.Client(self.cluster_address)
+        if (self.tls_ca) or (self.tls_key) or (self.tls_cert):
+            from distributed.security import Security
+            security = Security(
+                tls_client_key=self.tls_key,
+                tls_client_cert=self.tls_cert,
+                tls_ca_file=self.tls_ca,
+            )
+        else:
+            security = None
+
+        self.client = distributed.Client(self.cluster_address, 
security=security)
         self.futures = {}
 
     def execute_async(self, key, command, queue=None):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e95a1251/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 700f3ae..9709ddb 100644
--- a/setup.py
+++ b/setup.py
@@ -112,7 +112,7 @@ cgroups = [
 ]
 crypto = ['cryptography>=0.9.3']
 dask = [
-    'distributed>=1.15.2, <2'
+    'distributed>=1.17.1, <2'
     ]
 databricks = ['requests>=2.5.1, <3']
 datadog = ['datadog>=0.14.0']

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e95a1251/tests/executors/dask_executor.py
----------------------------------------------------------------------
diff --git a/tests/executors/dask_executor.py b/tests/executors/dask_executor.py
index 40796bc..7937b1b 100644
--- a/tests/executors/dask_executor.py
+++ b/tests/executors/dask_executor.py
@@ -29,6 +29,13 @@ from datetime import timedelta
 try:
     from airflow.executors.dask_executor import DaskExecutor
     from distributed import LocalCluster
+    # utility functions imported from the dask testing suite to instantiate a 
test
+    # cluster for tls tests
+    from distributed.utils_test import (
+        get_cert,
+        cluster as dask_testing_cluster,
+        tls_security,
+    )
     SKIP_DASK = False
 except ImportError:
     SKIP_DASK = True
@@ -42,16 +49,9 @@ SKIP_DASK = True
 DEFAULT_DATE = timezone.datetime(2017, 1, 1)
 
 
-class DaskExecutorTest(unittest.TestCase):
-
-    def setUp(self):
-        self.dagbag = DagBag(include_examples=True)
-        self.cluster = LocalCluster()
-
-    @unittest.skipIf(SKIP_DASK, 'Dask unsupported by this configuration')
-    def test_dask_executor_functions(self):
-        executor = DaskExecutor(cluster_address=self.cluster.scheduler_address)
+class BaseDaskTest(unittest.TestCase):
 
+    def assert_tasks_on_executor(self, executor):
         # start the executor
         executor.start()
 
@@ -82,6 +82,18 @@ class DaskExecutorTest(unittest.TestCase):
         self.assertTrue(success_future.exception() is None)
         self.assertTrue(fail_future.exception() is not None)
 
+
+class DaskExecutorTest(BaseDaskTest):
+
+    def setUp(self):
+        self.dagbag = DagBag(include_examples=True)
+        self.cluster = LocalCluster()
+
+    @unittest.skipIf(SKIP_DASK, 'Dask unsupported by this configuration')
+    def test_dask_executor_functions(self):
+        executor = DaskExecutor(cluster_address=self.cluster.scheduler_address)
+        self.assert_tasks_on_executor(executor)
+
     @unittest.skipIf(SKIP_DASK, 'Dask unsupported by this configuration')
     def test_backfill_integration(self):
         """
@@ -112,3 +124,34 @@ class DaskExecutorTest(unittest.TestCase):
 
     def tearDown(self):
         self.cluster.close(timeout=5)
+
+
+class DaskExecutorTLSTest(BaseDaskTest):
+
+    def setUp(self):
+        self.dagbag = DagBag(include_examples=True)
+
+    @unittest.skipIf(SKIP_DASK, 'Dask unsupported by this configuration')
+    def test_tls(self):
+        with dask_testing_cluster(
+                worker_kwargs={'security': tls_security()},
+                scheduler_kwargs={'security': tls_security()}) as (s, workers):
+
+            # These use test certs that ship with dask/distributed and should 
not be
+            #  used in production
+            configuration.set('dask', 'tls_ca', get_cert('tls-ca-cert.pem'))
+            configuration.set('dask', 'tls_cert', get_cert('tls-key-cert.pem'))
+            configuration.set('dask', 'tls_key', get_cert('tls-key.pem'))
+            try:
+                executor = DaskExecutor(cluster_address=s['address'])
+
+                self.assert_tasks_on_executor(executor)
+
+                executor.end()
+                # close the executor, the cluster context manager expects all 
listeners
+                # and tasks to have completed.
+                executor.client.close()
+            finally:
+                configuration.set('dask', 'tls_ca', '')
+                configuration.set('dask', 'tls_key', '')
+                configuration.set('dask', 'tls_cert', '')

Reply via email to