Repository: incubator-airflow Updated Branches: refs/heads/master 3fa55db90 -> e95a1251b
[AIRFLOW-2240][DASK] Added TLS/SSL support for the dask-distributed scheduler. As of 0.17.0 dask distributed has support for TLS/SSL. [dask] Added TLS/SSL support for the dask- distributed scheduler. As of 0.17.0 dask distributed has support for TLS/SSL. Add a test for tls under dask distributed Closes #2683 from mariusvniekerk/dask-ssl Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e95a1251 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e95a1251 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e95a1251 Branch: refs/heads/master Commit: e95a1251b746ac74a47b3ccae52d4abdc26add82 Parents: 3fa55db Author: Marius van Niekerk <mvanniek...@flatiron.com> Authored: Wed Apr 18 09:45:46 2018 -0700 Committer: Arthur Wiedmer <awied...@netflix.com> Committed: Wed Apr 18 09:45:52 2018 -0700 ---------------------------------------------------------------------- airflow/config_templates/default_airflow.cfg | 4 ++ airflow/executors/dask_executor.py | 16 +++++- setup.py | 2 +- tests/executors/dask_executor.py | 61 +++++++++++++++++++---- 4 files changed, 72 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e95a1251/airflow/config_templates/default_airflow.cfg ---------------------------------------------------------------------- diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index bb3793b..400bcc0 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -364,6 +364,10 @@ ssl_cacert = # The IP address and port of the Dask cluster's scheduler. cluster_address = 127.0.0.1:8786 +# TLS/ SSL settings to access a secured Dask scheduler. +tls_ca = +tls_cert = +tls_key = [scheduler] http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e95a1251/airflow/executors/dask_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/dask_executor.py b/airflow/executors/dask_executor.py index 0d914ba..17ace55 100644 --- a/airflow/executors/dask_executor.py +++ b/airflow/executors/dask_executor.py @@ -36,10 +36,24 @@ class DaskExecutor(BaseExecutor): raise ValueError( 'Please provide a Dask cluster address in airflow.cfg') self.cluster_address = cluster_address + # ssl / tls parameters + self.tls_ca = configuration.get('dask', 'tls_ca') + self.tls_key = configuration.get('dask', 'tls_key') + self.tls_cert = configuration.get('dask', 'tls_cert') super(DaskExecutor, self).__init__(parallelism=0) def start(self): - self.client = distributed.Client(self.cluster_address) + if (self.tls_ca) or (self.tls_key) or (self.tls_cert): + from distributed.security import Security + security = Security( + tls_client_key=self.tls_key, + tls_client_cert=self.tls_cert, + tls_ca_file=self.tls_ca, + ) + else: + security = None + + self.client = distributed.Client(self.cluster_address, security=security) self.futures = {} def execute_async(self, key, command, queue=None): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e95a1251/setup.py ---------------------------------------------------------------------- diff --git a/setup.py b/setup.py index 700f3ae..9709ddb 100644 --- a/setup.py +++ b/setup.py @@ -112,7 +112,7 @@ cgroups = [ ] crypto = ['cryptography>=0.9.3'] dask = [ - 'distributed>=1.15.2, <2' + 'distributed>=1.17.1, <2' ] databricks = ['requests>=2.5.1, <3'] datadog = ['datadog>=0.14.0'] http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e95a1251/tests/executors/dask_executor.py ---------------------------------------------------------------------- diff --git a/tests/executors/dask_executor.py b/tests/executors/dask_executor.py index 40796bc..7937b1b 100644 --- a/tests/executors/dask_executor.py +++ b/tests/executors/dask_executor.py @@ -29,6 +29,13 @@ from datetime import timedelta try: from airflow.executors.dask_executor import DaskExecutor from distributed import LocalCluster + # utility functions imported from the dask testing suite to instantiate a test + # cluster for tls tests + from distributed.utils_test import ( + get_cert, + cluster as dask_testing_cluster, + tls_security, + ) SKIP_DASK = False except ImportError: SKIP_DASK = True @@ -42,16 +49,9 @@ SKIP_DASK = True DEFAULT_DATE = timezone.datetime(2017, 1, 1) -class DaskExecutorTest(unittest.TestCase): - - def setUp(self): - self.dagbag = DagBag(include_examples=True) - self.cluster = LocalCluster() - - @unittest.skipIf(SKIP_DASK, 'Dask unsupported by this configuration') - def test_dask_executor_functions(self): - executor = DaskExecutor(cluster_address=self.cluster.scheduler_address) +class BaseDaskTest(unittest.TestCase): + def assert_tasks_on_executor(self, executor): # start the executor executor.start() @@ -82,6 +82,18 @@ class DaskExecutorTest(unittest.TestCase): self.assertTrue(success_future.exception() is None) self.assertTrue(fail_future.exception() is not None) + +class DaskExecutorTest(BaseDaskTest): + + def setUp(self): + self.dagbag = DagBag(include_examples=True) + self.cluster = LocalCluster() + + @unittest.skipIf(SKIP_DASK, 'Dask unsupported by this configuration') + def test_dask_executor_functions(self): + executor = DaskExecutor(cluster_address=self.cluster.scheduler_address) + self.assert_tasks_on_executor(executor) + @unittest.skipIf(SKIP_DASK, 'Dask unsupported by this configuration') def test_backfill_integration(self): """ @@ -112,3 +124,34 @@ class DaskExecutorTest(unittest.TestCase): def tearDown(self): self.cluster.close(timeout=5) + + +class DaskExecutorTLSTest(BaseDaskTest): + + def setUp(self): + self.dagbag = DagBag(include_examples=True) + + @unittest.skipIf(SKIP_DASK, 'Dask unsupported by this configuration') + def test_tls(self): + with dask_testing_cluster( + worker_kwargs={'security': tls_security()}, + scheduler_kwargs={'security': tls_security()}) as (s, workers): + + # These use test certs that ship with dask/distributed and should not be + # used in production + configuration.set('dask', 'tls_ca', get_cert('tls-ca-cert.pem')) + configuration.set('dask', 'tls_cert', get_cert('tls-key-cert.pem')) + configuration.set('dask', 'tls_key', get_cert('tls-key.pem')) + try: + executor = DaskExecutor(cluster_address=s['address']) + + self.assert_tasks_on_executor(executor) + + executor.end() + # close the executor, the cluster context manager expects all listeners + # and tasks to have completed. + executor.client.close() + finally: + configuration.set('dask', 'tls_ca', '') + configuration.set('dask', 'tls_key', '') + configuration.set('dask', 'tls_cert', '')