Repository: storm
Updated Branches:
  refs/heads/1.x-branch e0b1333d0 -> c814b7fad


STORM-2038: Disable symlinks with a config option


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f9bfb210
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f9bfb210
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f9bfb210

Branch: refs/heads/1.x-branch
Commit: f9bfb2107ca6753c0fcf51c7381df7166eac0fd8
Parents: 825e7f3
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Feb 27 15:06:43 2017 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Mar 16 13:07:54 2017 -0500

----------------------------------------------------------------------
 conf/defaults.yaml                              |  1 +
 docs/windows-users-guide.md                     |  9 ++++-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  7 +++-
 storm-core/src/jvm/org/apache/storm/Config.java |  9 +++++
 .../storm/daemon/supervisor/AdvancedFSOps.java  | 16 ++++++--
 .../storm/daemon/supervisor/Container.java      | 41 +++++++++++++-------
 .../apache/storm/localizer/AsyncLocalizer.java  | 32 ++++++++-------
 .../org/apache/storm/localizer/Localizer.java   |  8 +++-
 .../src/jvm/org/apache/storm/utils/Utils.java   | 10 +++++
 9 files changed, 97 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f9bfb210/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index e1b7a3c..f89211b 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -58,6 +58,7 @@ storm.codedistributor.class: 
"org.apache.storm.codedistributor.LocalFileSystemCo
 storm.workers.artifacts.dir: "workers-artifacts"
 storm.health.check.dir: "healthchecks"
 storm.health.check.timeout.ms: 5000
+storm.disable.symlinks: false
 
 ### nimbus.* configs are for the master
 nimbus.seeds : ["localhost"]

http://git-wip-us.apache.org/repos/asf/storm/blob/f9bfb210/docs/windows-users-guide.md
----------------------------------------------------------------------
diff --git a/docs/windows-users-guide.md b/docs/windows-users-guide.md
index 4ebd929..9c9a850 100644
--- a/docs/windows-users-guide.md
+++ b/docs/windows-users-guide.md
@@ -23,5 +23,10 @@ One tricky point is, `administrator` group already has this 
privilege, but it's
 So if your account belongs to `administrator` group (and you don't want to 
change it), you may want to open `command prompt` with `run as administrator` 
and execute processes within that console.
 If you don't want to execute Storm processes directly (not on command prompt), 
please execute processes with `runas /user:administrator` to run as 
administrator account.
 
-Starting with Windows 10 Creators Update, it will be possible to activate a 
Developer Mode that supports creating symbolic links without `run as 
administrator`
-[Symlinks in Windows 
10!](https://blogs.windows.com/buildingapps/2016/12/02/symlinks-windows-10/)
+Starting with Windows 10 Creators Update, it will be possible to activate a 
Developer Mode that supports creating symbolic links without `run as 
administrator`
+[Symlinks in Windows 
10!](https://blogs.windows.com/buildingapps/2016/12/02/symlinks-windows-10/)
+
+Alternatively you can disable usage of symbolic links by setting the config 
`storm.disable.symlinks` to `true`
+on Nimbus and all of the Supervisor nodes.  This will also disable features 
that require symlinks.  Currently this is only downloading
+dependent blobs, but may change in the future.  Some topologies may rely on 
symbolic links to resources in the current working directory of the worker that 
are
+created as a convienence, so it is not a 100% backwards compatible change.

http://git-wip-us.apache.org/repos/asf/storm/blob/f9bfb210/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj 
b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 9b2d04b..f3b3373 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -1653,7 +1653,12 @@
                      topo-conf
                      topology)
           (Utils/validateTopologyBlobStoreMap topo-conf (Sets/newHashSet 
^Iterator (.listKeys blob-store)))
-          )
+          (when (.get conf DISABLE-SYMLINKS)
+            (let [blob-map (.get topo-conf TOPOLOGY-BLOBSTORE-MAP)]
+              (when (and (not-nil? blob-map) (not (.isEmpty blob-map)))
+                (throw (InvalidTopologyException. 
+                         (str "symlinks are disabled so blobs are not 
supported but " TOPOLOGY-BLOBSTORE-MAP
+                              " = " blob-map)))))))
         (swap! (:submitted-count nimbus) inc)
         (let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" 
(current-time-secs))
               credentials (.get_creds submitOptions)

http://git-wip-us.apache.org/repos/asf/storm/blob/f9bfb210/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java 
b/storm-core/src/jvm/org/apache/storm/Config.java
index 62fafd3..e552a09 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -1428,6 +1428,15 @@ public class Config extends HashMap<String, Object> {
     public static final String SUPERVISOR_CPU_CAPACITY = 
"supervisor.cpu.capacity";
 
     /**
+     * On some systems (windows for example) symlinks require special 
privileges that not everyone wants to
+     * grant a headless user.  You can completely disable the use of symlinks 
by setting this config to true, but
+     * by doing so you may also lose some features from storm.  For example 
the blobstore feature
+     * does not currently work without symlinks enabled.
+     */
+    @isBoolean
+    public static final String DISABLE_SYMLINKS = "storm.disable.symlinks";
+
+    /**
      * The jvm opts provided to workers launched by this supervisor.
      * All "%ID%", "%WORKER-ID%", "%TOPOLOGY-ID%",
      * "%WORKER-PORT%" and "%HEAP-MEM%" substrings are replaced with:

http://git-wip-us.apache.org/repos/asf/storm/blob/f9bfb210/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
index 4b96197..87d726f 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
@@ -41,6 +41,8 @@ import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.storm.utils.Utils.OR;
+
 public class AdvancedFSOps {
     private static final Logger LOG = 
LoggerFactory.getLogger(AdvancedFSOps.class);
     
@@ -56,13 +58,14 @@ public class AdvancedFSOps {
         if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), 
false)) {
             return new AdvancedRunAsUserFSOps(conf);
         }
-        return new AdvancedFSOps();
+        return new AdvancedFSOps(conf);
     }
     
     private static class AdvancedRunAsUserFSOps extends AdvancedFSOps {
         private final Map<String, Object> _conf;
         
         public AdvancedRunAsUserFSOps(Map<String, Object> conf) {
+            super(conf);
             if (Utils.isOnWindows()) {
                 throw new UnsupportedOperationException("ERROR: Windows 
doesn't support running workers as different users yet");
             }
@@ -116,6 +119,7 @@ public class AdvancedFSOps {
     private static class AdvancedWindowsFSOps extends AdvancedFSOps {
 
         public AdvancedWindowsFSOps(Map<String, Object> conf) {
+            super(conf);
             if 
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
                 throw new RuntimeException("ERROR: Windows doesn't support 
running workers as different users yet");
             }
@@ -140,10 +144,11 @@ public class AdvancedFSOps {
             return false;
         }
     }
+
+    protected final boolean _symlinksDisabled;
     
-    
-    protected AdvancedFSOps() {
-        //NOOP, but restricted permissions
+    protected AdvancedFSOps(Map<String, Object> conf) {
+        _symlinksDisabled = (boolean)OR(conf.get(Config.DISABLE_SYMLINKS), 
false);
     }
 
     /**
@@ -341,6 +346,9 @@ public class AdvancedFSOps {
      * @throws IOException on any error.
      */
     public void createSymlink(File link, File target) throws IOException {
+        if (_symlinksDisabled) {
+            throw new IOException("Symlinks have been disabled, this should 
not be called");
+        }
         Path plink = link.toPath().toAbsolutePath();
         Path ptarget = target.toPath().toAbsolutePath();
         LOG.debug("Creating symlink [{}] to [{}]", plink, ptarget);

http://git-wip-us.apache.org/repos/asf/storm/blob/f9bfb210/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
index 3f86b79..859ccf1 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
@@ -43,6 +43,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
 
+import static org.apache.storm.utils.Utils.OR;
+
 /**
  * Represents a container that a worker will run in.
  */
@@ -85,7 +87,8 @@ public abstract class Container implements Killable {
     protected final LocalAssignment _assignment; //Not set if RECOVER_PARTIAL
     protected final AdvancedFSOps _ops;
     protected ContainerType _type;
-    
+    protected final boolean _symlinksDisabled;
+
     /**
      * Create a new Container.
      * @param type the type of container being made.
@@ -106,6 +109,8 @@ public abstract class Container implements Killable {
         assert(conf != null);
         assert(supervisorId != null);
         
+        _symlinksDisabled = (boolean)OR(conf.get(Config.DISABLE_SYMLINKS), 
false);
+        
         if (ops == null) {
             ops = AdvancedFSOps.make(conf);
         }
@@ -376,11 +381,13 @@ public abstract class Container implements Killable {
      */
     protected void createArtifactsLink() throws IOException {
         _type.assertFull();
-        File workerDir = new File(ConfigUtils.workerRoot(_conf, _workerId));
-        File topoDir = new File(ConfigUtils.workerArtifactsRoot(_conf, 
_topologyId, _port));
-        if (_ops.fileExists(workerDir)) {
-            LOG.debug("Creating symlinks for worker-id: {} topology-id: {} to 
its port artifacts directory", _workerId, _topologyId);
-            _ops.createSymlink(new File(workerDir, "artifacts"), topoDir);
+        if (!_symlinksDisabled) {
+            File workerDir = new File(ConfigUtils.workerRoot(_conf, 
_workerId));
+            File topoDir = new File(ConfigUtils.workerArtifactsRoot(_conf, 
_topologyId, _port));
+            if (_ops.fileExists(workerDir)) {
+                LOG.debug("Creating symlinks for worker-id: {} topology-id: {} 
to its port artifacts directory", _workerId, _topologyId);
+                _ops.createSymlink(new File(workerDir, "artifacts"), topoDir);
+            }
         }
     }
     
@@ -417,15 +424,19 @@ public abstract class Container implements Killable {
         }
         resourceFileNames.addAll(blobFileNames);
 
-        LOG.info("Creating symlinks for worker-id: {} storm-id: {} for 
files({}): {}", _workerId, _topologyId, resourceFileNames.size(), 
resourceFileNames);
-        if(targetResourcesDir.exists()) {
-            _ops.createSymlink(new File(workerRoot, 
ConfigUtils.RESOURCES_SUBDIR),  targetResourcesDir );
-        } else {
-            LOG.info("Topology jar for worker-id: {} storm-id: {} does not 
contain re sources directory {}." , _workerId, _topologyId, 
targetResourcesDir.toString() );
-        }
-        for (String fileName : blobFileNames) {
-            _ops.createSymlink(new File(workerRoot, fileName),
-                    new File(stormRoot, fileName));
+        if (!_symlinksDisabled) {
+            LOG.info("Creating symlinks for worker-id: {} storm-id: {} for 
files({}): {}", _workerId, _topologyId, resourceFileNames.size(), 
resourceFileNames);
+            if (targetResourcesDir.exists()) {
+                _ops.createSymlink(new File(workerRoot, 
ConfigUtils.RESOURCES_SUBDIR),  targetResourcesDir );
+            } else {
+                LOG.info("Topology jar for worker-id: {} storm-id: {} does not 
contain re sources directory {}." , _workerId, _topologyId, 
targetResourcesDir.toString() );
+            }
+            for (String fileName : blobFileNames) {
+                _ops.createSymlink(new File(workerRoot, fileName),
+                        new File(stormRoot, fileName));
+            }
+        } else if (blobFileNames.size() > 0) {
+            LOG.warn("Symlinks are disabled, no symlinks created for blobs 
{}", blobFileNames);
         }
     }
     

http://git-wip-us.apache.org/repos/asf/storm/blob/f9bfb210/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java 
b/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
index d3e3925..a2f9857 100644
--- a/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
@@ -48,6 +48,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+import static org.apache.storm.utils.Utils.OR;
+
 /**
  * This is a wrapper around the Localizer class that provides the desired
  * async interface to Slot.
@@ -94,6 +96,7 @@ public class AsyncLocalizer implements ILocalizer, 
Shutdownable {
     private final Map<String, LocalDownloadedResource> _basicPending;
     private final Map<String, LocalDownloadedResource> _blobPending;
     private final AdvancedFSOps _fsOps;
+    private final boolean _symlinksDisabled;
 
     private class DownloadBaseBlobsDistributed implements Callable<Void> {
         protected final String _topologyId;
@@ -250,24 +253,26 @@ public class AsyncLocalizer implements ILocalizer, 
Shutdownable {
                     }
                     List<LocalizedResource> localizedResources = 
_localizer.getBlobs(localResourceList, user, topoName, userDir);
                     _fsOps.setupBlobPermissions(userDir, user);
-                    for (LocalizedResource localizedResource : 
localizedResources) {
-                        String keyName = localizedResource.getKey();
-                        //The sym link we are pointing to
-                        File rsrcFilePath = new 
File(localizedResource.getCurrentSymlinkPath());
+                    if (!_symlinksDisabled) {
+                        for (LocalizedResource localizedResource : 
localizedResources) {
+                            String keyName = localizedResource.getKey();
+                            //The sym link we are pointing to
+                            File rsrcFilePath = new 
File(localizedResource.getCurrentSymlinkPath());
 
-                        String symlinkName = null;
-                        if (blobstoreMap != null) {
-                            Map<String, Object> blobInfo = 
blobstoreMap.get(keyName);
-                            if (blobInfo != null && 
blobInfo.containsKey("localname")) {
-                                symlinkName = (String) 
blobInfo.get("localname");
+                            String symlinkName = null;
+                            if (blobstoreMap != null) {
+                                Map<String, Object> blobInfo = 
blobstoreMap.get(keyName);
+                                if (blobInfo != null && 
blobInfo.containsKey("localname")) {
+                                    symlinkName = (String) 
blobInfo.get("localname");
+                                } else {
+                                    symlinkName = keyName;
+                                }
                             } else {
+                                // all things are from dependencies
                                 symlinkName = keyName;
                             }
-                        } else {
-                            // all things are from dependencies
-                            symlinkName = keyName;
+                            _fsOps.createSymlink(new File(stormroot, 
symlinkName), rsrcFilePath);
                         }
-                        _fsOps.createSymlink(new File(stormroot, symlinkName), 
rsrcFilePath);
                     }
                 }
 
@@ -282,6 +287,7 @@ public class AsyncLocalizer implements ILocalizer, 
Shutdownable {
     //Visible for testing
     AsyncLocalizer(Map<String, Object> conf, Localizer localizer, 
AdvancedFSOps ops) {
         _conf = conf;
+        _symlinksDisabled = (boolean)OR(conf.get(Config.DISABLE_SYMLINKS), 
false);
         _isLocalMode = ConfigUtils.isLocalMode(conf);
         _localizer = localizer;
         _execService = Executors.newFixedThreadPool(1,  

http://git-wip-us.apache.org/repos/asf/storm/blob/f9bfb210/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java 
b/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java
index 0135397..616406f 100644
--- a/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java
+++ b/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java
@@ -56,6 +56,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static org.apache.storm.utils.Utils.OR;
 
 /**
  * Class to download and manage files from the blobstore.  It uses an LRU cache
@@ -345,6 +346,9 @@ public class Localizer {
         if (lrsrc == null) {
           LOG.warn("blob requested for update doesn't exist: {}", key);
           continue;
+        } else if ((boolean)OR(_conf.get(Config.DISABLE_SYMLINKS), false)) {
+          LOG.warn("symlinks are disabled so blobs cannot be downloaded.");
+          continue;
         } else {
           // update it if either the version isn't the latest or if any local 
blob files are missing
           if (!isLocalizedResourceUpToDate(lrsrc, blobstore) ||
@@ -400,7 +404,9 @@ public class Localizer {
   public synchronized List<LocalizedResource> getBlobs(List<LocalResource> 
localResources,
       String user, String topo, File userFileDir)
       throws AuthorizationException, KeyNotFoundException, IOException {
-
+    if ((boolean)OR(_conf.get(Config.DISABLE_SYMLINKS), false)) {
+      throw new KeyNotFoundException("symlinks are disabled so blobs cannot be 
downloaded.");
+    }
     LocalizedResourceSet newSet = new LocalizedResourceSet(user);
     LocalizedResourceSet lrsrcSet = _userRsrc.putIfAbsent(user, newSet);
     if (lrsrcSet == null) {

http://git-wip-us.apache.org/repos/asf/storm/blob/f9bfb210/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java 
b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index 5696a43..1a13a6f 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -2076,6 +2076,16 @@ public class Utils {
     }
 
     /**
+     * a or b the first one that is not null
+     * @param a something
+     * @param b something else
+     * @return a or b the first one that is not null
+     */
+    public static <V> V OR(V a, V b) {
+        return a == null ? b : a;
+    }
+
+    /**
      * Writes a posix shell script file to be executed in its own process.
      * @param dir the directory under which the script is to be written
      * @param command the command the script is to execute

Reply via email to