[FLINK-3932] Added ZK ACL configuration for secure cluster setup

This closes #2589.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5bd47012
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5bd47012
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5bd47012

Branch: refs/heads/master
Commit: 5bd47012e50516d45a6b50d47f347e6802dfde4c
Parents: 85b5344
Author: Vijay Srinivasaraghavan <vijayaraghavan.srinivasaragha...@emc.com>
Authored: Thu Sep 22 10:10:01 2016 -0700
Committer: Maximilian Michels <m...@apache.org>
Committed: Fri Oct 14 16:36:30 2016 +0200

----------------------------------------------------------------------
 docs/setup/config.md                            |  2 +
 .../flink/configuration/ConfigConstants.java    |  7 ++
 flink-dist/src/main/resources/flink-conf.yaml   |  6 ++
 .../flink/runtime/util/ZooKeeperUtils.java      | 70 ++++++++++++++++++++
 4 files changed, 85 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5bd47012/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 3f6b705..0c8f451 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -332,6 +332,8 @@ Previously this key was named `recovery.mode` and the 
default value was `standal
 
 - `high-availability.job.delay`: (Default `akka.ask.timeout`) Defines the 
delay before persisted jobs are recovered in case of a master recovery 
situation. Previously this key was named `recovery.job.delay`.
 
+- `high-availability.zookeeper.client.acl`: (Default `open`) Defines the ACL 
(open|creator) to be configured on ZK node. The configuration value can be set 
to "creator" if the ZooKeeper server configuration has the "authProvider" 
property mapped to use SASLAuthenticationProvider and the cluster is configured 
to run in secure mode (Kerberos). The ACL options are based on 
https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
+
 ### ZooKeeper-Security
 
 - `zookeeper.sasl.disable`: (Default: `true`) Defines if SASL based 
authentication needs to be enabled or disabled. The configuration value can be 
set to "true" if ZooKeeper cluster is running in secure mode (Kerberos)

http://git-wip-us.apache.org/repos/asf/flink/blob/5bd47012/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index a64b631..e608eb3 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -759,6 +759,9 @@ public final class ConfigConstants {
        public static final String HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS = 
"high-availability.zookeeper.client.max-retry-attempts";
 
        @PublicEvolving
+       public static final String HA_ZOOKEEPER_CLIENT_ACL = 
"high-availability.zookeeper.client.acl";
+
+       @PublicEvolving
        public static final String ZOOKEEPER_SASL_DISABLE = 
"zookeeper.sasl.disable";
 
        @PublicEvolving
@@ -1268,6 +1271,10 @@ public final class ConfigConstants {
        /** Defaults for ZK client security **/
        public static final boolean DEFAULT_ZOOKEEPER_SASL_DISABLE = true;
 
+       /** ACL options supported "creator" or "open" */
+       public static final String DEFAULT_HA_ZOOKEEPER_CLIENT_ACL = "open";
+
+
        // ------------------------- Queryable state 
------------------------------
 
        /** Port to bind KvState server to. */

http://git-wip-us.apache.org/repos/asf/flink/blob/5bd47012/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml 
b/flink-dist/src/main/resources/flink-conf.yaml
index c876922..ad916e8 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -144,6 +144,12 @@ jobmanager.web.port: 8081
 # high-availability.zookeeper.quorum: localhost:2181
 # high-availability.zookeeper.storageDir: hdfs:///flink/ha/
 
+# ACL options are based on 
https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
+# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" 
(ZOO_OPEN_ACL_UNSAFE)
+# The default value is "open" and it can be changed to "creator" if ZK 
security is enabled
+#
+# high-availability.zookeeper.client.acl: open
+
 #==============================================================================
 # Flink Cluster Security Configuration (optional configuration)
 #==============================================================================

http://git-wip-us.apache.org/repos/asf/flink/blob/5bd47012/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 67fc397..7862f87 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.util;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.imps.DefaultACLProvider;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
@@ -39,11 +41,14 @@ import 
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import 
org.apache.flink.runtime.zookeeper.filesystem.FileSystemStateStorageHelper;
 import org.apache.flink.util.ConfigurationUtil;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -79,6 +84,29 @@ public class ZooKeeperUtils {
 
                String namespace = 
configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
 
+               boolean disableSaslClient = 
configuration.getBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE,
+                               ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE);
+
+               ACLProvider aclProvider;
+
+               ZkClientACLMode aclMode = 
ZkClientACLMode.fromConfig(configuration);
+
+               if(disableSaslClient && aclMode == ZkClientACLMode.CREATOR) {
+                       String errorMessage = "Cannot set ACL role to " + 
aclMode +"  since SASL authentication is " +
+                                       "disabled through the " + 
ConfigConstants.ZOOKEEPER_SASL_DISABLE + " property";
+                       LOG.warn(errorMessage);
+                       throw new IllegalConfigurationException(errorMessage);
+               }
+
+               if(aclMode == ZkClientACLMode.CREATOR) {
+                       LOG.info("Enforcing creator for ZK connections");
+                       aclProvider = new SecureAclProvider();
+               } else {
+                       LOG.info("Enforcing default ACL for ZK connections");
+                       aclProvider = new DefaultACLProvider();
+               }
+
+
                String rootWithNamespace = generateZookeeperPath(root, 
namespace);
 
                LOG.info("Using '{}' as Zookeeper namespace.", 
rootWithNamespace);
@@ -91,6 +119,7 @@ public class ZooKeeperUtils {
                                // Curator prepends a '/' manually and throws 
an Exception if the
                                // namespace starts with a '/'.
                                .namespace(rootWithNamespace.startsWith("/") ? 
rootWithNamespace.substring(1) : rootWithNamespace)
+                               .aclProvider(aclProvider)
                                .build();
 
                cf.start();
@@ -306,6 +335,47 @@ public class ZooKeeperUtils {
                return root + namespace;
        }
 
+
+       public static class SecureAclProvider implements ACLProvider
+       {
+               @Override
+               public List<ACL> getDefaultAcl()
+               {
+                       return ZooDefs.Ids.CREATOR_ALL_ACL;
+               }
+
+               @Override
+               public List<ACL> getAclForPath(String path)
+               {
+                       return ZooDefs.Ids.CREATOR_ALL_ACL;
+               }
+       }
+
+       public enum ZkClientACLMode {
+               CREATOR,
+               OPEN;
+
+               /**
+                * Return the configured {@link ZkClientACLMode}.
+                *
+                * @param config The config to parse
+                * @return Configured ACL mode or {@link 
ConfigConstants#DEFAULT_HA_ZOOKEEPER_CLIENT_ACL} if not
+                * configured.
+                */
+               public static ZkClientACLMode fromConfig(Configuration config) {
+                       String aclMode = 
config.getString(ConfigConstants.HA_ZOOKEEPER_CLIENT_ACL, null);
+                       if (aclMode == null || 
aclMode.equalsIgnoreCase(ZkClientACLMode.OPEN.name())) {
+                               return ZkClientACLMode.OPEN;
+                       } else if 
(aclMode.equalsIgnoreCase(ZkClientACLMode.CREATOR.name())) {
+                               return ZkClientACLMode.CREATOR;
+                       } else {
+                               String message = "Unsupported ACL option: [" + 
aclMode + "] provided";
+                               LOG.error(message);
+                               throw new 
IllegalConfigurationException(message);
+                       }
+               }
+       }
+
        /**
         * Private constructor to prevent instantiation.
         */

Reply via email to