Repository: storm Updated Branches: refs/heads/1.x-branch e0b1333d0 -> c814b7fad
STORM-2038: Disable symlinks with a config option Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f9bfb210 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f9bfb210 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f9bfb210 Branch: refs/heads/1.x-branch Commit: f9bfb2107ca6753c0fcf51c7381df7166eac0fd8 Parents: 825e7f3 Author: Robert (Bobby) Evans <ev...@yahoo-inc.com> Authored: Mon Feb 27 15:06:43 2017 -0600 Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com> Committed: Thu Mar 16 13:07:54 2017 -0500 ---------------------------------------------------------------------- conf/defaults.yaml | 1 + docs/windows-users-guide.md | 9 ++++- .../src/clj/org/apache/storm/daemon/nimbus.clj | 7 +++- storm-core/src/jvm/org/apache/storm/Config.java | 9 +++++ .../storm/daemon/supervisor/AdvancedFSOps.java | 16 ++++++-- .../storm/daemon/supervisor/Container.java | 41 +++++++++++++------- .../apache/storm/localizer/AsyncLocalizer.java | 32 ++++++++------- .../org/apache/storm/localizer/Localizer.java | 8 +++- .../src/jvm/org/apache/storm/utils/Utils.java | 10 +++++ 9 files changed, 97 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/f9bfb210/conf/defaults.yaml ---------------------------------------------------------------------- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index e1b7a3c..f89211b 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -58,6 +58,7 @@ storm.codedistributor.class: "org.apache.storm.codedistributor.LocalFileSystemCo storm.workers.artifacts.dir: "workers-artifacts" storm.health.check.dir: "healthchecks" storm.health.check.timeout.ms: 5000 +storm.disable.symlinks: false ### nimbus.* configs are for the master nimbus.seeds : ["localhost"] http://git-wip-us.apache.org/repos/asf/storm/blob/f9bfb210/docs/windows-users-guide.md ---------------------------------------------------------------------- diff --git a/docs/windows-users-guide.md b/docs/windows-users-guide.md index 4ebd929..9c9a850 100644 --- a/docs/windows-users-guide.md +++ b/docs/windows-users-guide.md @@ -23,5 +23,10 @@ One tricky point is, `administrator` group already has this privilege, but it's So if your account belongs to `administrator` group (and you don't want to change it), you may want to open `command prompt` with `run as administrator` and execute processes within that console. If you don't want to execute Storm processes directly (not on command prompt), please execute processes with `runas /user:administrator` to run as administrator account. -Starting with Windows 10 Creators Update, it will be possible to activate a Developer Mode that supports creating symbolic links without `run as administrator` -[Symlinks in Windows 10!](https://blogs.windows.com/buildingapps/2016/12/02/symlinks-windows-10/) +Starting with Windows 10 Creators Update, it will be possible to activate a Developer Mode that supports creating symbolic links without `run as administrator` +[Symlinks in Windows 10!](https://blogs.windows.com/buildingapps/2016/12/02/symlinks-windows-10/) + +Alternatively you can disable usage of symbolic links by setting the config `storm.disable.symlinks` to `true` +on Nimbus and all of the Supervisor nodes. This will also disable features that require symlinks. Currently this is only downloading +dependent blobs, but may change in the future. Some topologies may rely on symbolic links to resources in the current working directory of the worker that are +created as a convienence, so it is not a 100% backwards compatible change. http://git-wip-us.apache.org/repos/asf/storm/blob/f9bfb210/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index 9b2d04b..f3b3373 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -1653,7 +1653,12 @@ topo-conf topology) (Utils/validateTopologyBlobStoreMap topo-conf (Sets/newHashSet ^Iterator (.listKeys blob-store))) - ) + (when (.get conf DISABLE-SYMLINKS) + (let [blob-map (.get topo-conf TOPOLOGY-BLOBSTORE-MAP)] + (when (and (not-nil? blob-map) (not (.isEmpty blob-map))) + (throw (InvalidTopologyException. + (str "symlinks are disabled so blobs are not supported but " TOPOLOGY-BLOBSTORE-MAP + " = " blob-map))))))) (swap! (:submitted-count nimbus) inc) (let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs)) credentials (.get_creds submitOptions) http://git-wip-us.apache.org/repos/asf/storm/blob/f9bfb210/storm-core/src/jvm/org/apache/storm/Config.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java index 62fafd3..e552a09 100644 --- a/storm-core/src/jvm/org/apache/storm/Config.java +++ b/storm-core/src/jvm/org/apache/storm/Config.java @@ -1428,6 +1428,15 @@ public class Config extends HashMap<String, Object> { public static final String SUPERVISOR_CPU_CAPACITY = "supervisor.cpu.capacity"; /** + * On some systems (windows for example) symlinks require special privileges that not everyone wants to + * grant a headless user. You can completely disable the use of symlinks by setting this config to true, but + * by doing so you may also lose some features from storm. For example the blobstore feature + * does not currently work without symlinks enabled. + */ + @isBoolean + public static final String DISABLE_SYMLINKS = "storm.disable.symlinks"; + + /** * The jvm opts provided to workers launched by this supervisor. * All "%ID%", "%WORKER-ID%", "%TOPOLOGY-ID%", * "%WORKER-PORT%" and "%HEAP-MEM%" substrings are replaced with: http://git-wip-us.apache.org/repos/asf/storm/blob/f9bfb210/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java index 4b96197..87d726f 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java @@ -41,6 +41,8 @@ import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.storm.utils.Utils.OR; + public class AdvancedFSOps { private static final Logger LOG = LoggerFactory.getLogger(AdvancedFSOps.class); @@ -56,13 +58,14 @@ public class AdvancedFSOps { if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { return new AdvancedRunAsUserFSOps(conf); } - return new AdvancedFSOps(); + return new AdvancedFSOps(conf); } private static class AdvancedRunAsUserFSOps extends AdvancedFSOps { private final Map<String, Object> _conf; public AdvancedRunAsUserFSOps(Map<String, Object> conf) { + super(conf); if (Utils.isOnWindows()) { throw new UnsupportedOperationException("ERROR: Windows doesn't support running workers as different users yet"); } @@ -116,6 +119,7 @@ public class AdvancedFSOps { private static class AdvancedWindowsFSOps extends AdvancedFSOps { public AdvancedWindowsFSOps(Map<String, Object> conf) { + super(conf); if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { throw new RuntimeException("ERROR: Windows doesn't support running workers as different users yet"); } @@ -140,10 +144,11 @@ public class AdvancedFSOps { return false; } } + + protected final boolean _symlinksDisabled; - - protected AdvancedFSOps() { - //NOOP, but restricted permissions + protected AdvancedFSOps(Map<String, Object> conf) { + _symlinksDisabled = (boolean)OR(conf.get(Config.DISABLE_SYMLINKS), false); } /** @@ -341,6 +346,9 @@ public class AdvancedFSOps { * @throws IOException on any error. */ public void createSymlink(File link, File target) throws IOException { + if (_symlinksDisabled) { + throw new IOException("Symlinks have been disabled, this should not be called"); + } Path plink = link.toPath().toAbsolutePath(); Path ptarget = target.toPath().toAbsolutePath(); LOG.debug("Creating symlink [{}] to [{}]", plink, ptarget); http://git-wip-us.apache.org/repos/asf/storm/blob/f9bfb210/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java index 3f86b79..859ccf1 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java @@ -43,6 +43,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.yaml.snakeyaml.Yaml; +import static org.apache.storm.utils.Utils.OR; + /** * Represents a container that a worker will run in. */ @@ -85,7 +87,8 @@ public abstract class Container implements Killable { protected final LocalAssignment _assignment; //Not set if RECOVER_PARTIAL protected final AdvancedFSOps _ops; protected ContainerType _type; - + protected final boolean _symlinksDisabled; + /** * Create a new Container. * @param type the type of container being made. @@ -106,6 +109,8 @@ public abstract class Container implements Killable { assert(conf != null); assert(supervisorId != null); + _symlinksDisabled = (boolean)OR(conf.get(Config.DISABLE_SYMLINKS), false); + if (ops == null) { ops = AdvancedFSOps.make(conf); } @@ -376,11 +381,13 @@ public abstract class Container implements Killable { */ protected void createArtifactsLink() throws IOException { _type.assertFull(); - File workerDir = new File(ConfigUtils.workerRoot(_conf, _workerId)); - File topoDir = new File(ConfigUtils.workerArtifactsRoot(_conf, _topologyId, _port)); - if (_ops.fileExists(workerDir)) { - LOG.debug("Creating symlinks for worker-id: {} topology-id: {} to its port artifacts directory", _workerId, _topologyId); - _ops.createSymlink(new File(workerDir, "artifacts"), topoDir); + if (!_symlinksDisabled) { + File workerDir = new File(ConfigUtils.workerRoot(_conf, _workerId)); + File topoDir = new File(ConfigUtils.workerArtifactsRoot(_conf, _topologyId, _port)); + if (_ops.fileExists(workerDir)) { + LOG.debug("Creating symlinks for worker-id: {} topology-id: {} to its port artifacts directory", _workerId, _topologyId); + _ops.createSymlink(new File(workerDir, "artifacts"), topoDir); + } } } @@ -417,15 +424,19 @@ public abstract class Container implements Killable { } resourceFileNames.addAll(blobFileNames); - LOG.info("Creating symlinks for worker-id: {} storm-id: {} for files({}): {}", _workerId, _topologyId, resourceFileNames.size(), resourceFileNames); - if(targetResourcesDir.exists()) { - _ops.createSymlink(new File(workerRoot, ConfigUtils.RESOURCES_SUBDIR), targetResourcesDir ); - } else { - LOG.info("Topology jar for worker-id: {} storm-id: {} does not contain re sources directory {}." , _workerId, _topologyId, targetResourcesDir.toString() ); - } - for (String fileName : blobFileNames) { - _ops.createSymlink(new File(workerRoot, fileName), - new File(stormRoot, fileName)); + if (!_symlinksDisabled) { + LOG.info("Creating symlinks for worker-id: {} storm-id: {} for files({}): {}", _workerId, _topologyId, resourceFileNames.size(), resourceFileNames); + if (targetResourcesDir.exists()) { + _ops.createSymlink(new File(workerRoot, ConfigUtils.RESOURCES_SUBDIR), targetResourcesDir ); + } else { + LOG.info("Topology jar for worker-id: {} storm-id: {} does not contain re sources directory {}." , _workerId, _topologyId, targetResourcesDir.toString() ); + } + for (String fileName : blobFileNames) { + _ops.createSymlink(new File(workerRoot, fileName), + new File(stormRoot, fileName)); + } + } else if (blobFileNames.size() > 0) { + LOG.warn("Symlinks are disabled, no symlinks created for blobs {}", blobFileNames); } } http://git-wip-us.apache.org/repos/asf/storm/blob/f9bfb210/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java b/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java index d3e3925..a2f9857 100644 --- a/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java +++ b/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java @@ -48,6 +48,8 @@ import org.slf4j.LoggerFactory; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import static org.apache.storm.utils.Utils.OR; + /** * This is a wrapper around the Localizer class that provides the desired * async interface to Slot. @@ -94,6 +96,7 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable { private final Map<String, LocalDownloadedResource> _basicPending; private final Map<String, LocalDownloadedResource> _blobPending; private final AdvancedFSOps _fsOps; + private final boolean _symlinksDisabled; private class DownloadBaseBlobsDistributed implements Callable<Void> { protected final String _topologyId; @@ -250,24 +253,26 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable { } List<LocalizedResource> localizedResources = _localizer.getBlobs(localResourceList, user, topoName, userDir); _fsOps.setupBlobPermissions(userDir, user); - for (LocalizedResource localizedResource : localizedResources) { - String keyName = localizedResource.getKey(); - //The sym link we are pointing to - File rsrcFilePath = new File(localizedResource.getCurrentSymlinkPath()); + if (!_symlinksDisabled) { + for (LocalizedResource localizedResource : localizedResources) { + String keyName = localizedResource.getKey(); + //The sym link we are pointing to + File rsrcFilePath = new File(localizedResource.getCurrentSymlinkPath()); - String symlinkName = null; - if (blobstoreMap != null) { - Map<String, Object> blobInfo = blobstoreMap.get(keyName); - if (blobInfo != null && blobInfo.containsKey("localname")) { - symlinkName = (String) blobInfo.get("localname"); + String symlinkName = null; + if (blobstoreMap != null) { + Map<String, Object> blobInfo = blobstoreMap.get(keyName); + if (blobInfo != null && blobInfo.containsKey("localname")) { + symlinkName = (String) blobInfo.get("localname"); + } else { + symlinkName = keyName; + } } else { + // all things are from dependencies symlinkName = keyName; } - } else { - // all things are from dependencies - symlinkName = keyName; + _fsOps.createSymlink(new File(stormroot, symlinkName), rsrcFilePath); } - _fsOps.createSymlink(new File(stormroot, symlinkName), rsrcFilePath); } } @@ -282,6 +287,7 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable { //Visible for testing AsyncLocalizer(Map<String, Object> conf, Localizer localizer, AdvancedFSOps ops) { _conf = conf; + _symlinksDisabled = (boolean)OR(conf.get(Config.DISABLE_SYMLINKS), false); _isLocalMode = ConfigUtils.isLocalMode(conf); _localizer = localizer; _execService = Executors.newFixedThreadPool(1, http://git-wip-us.apache.org/repos/asf/storm/blob/f9bfb210/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java b/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java index 0135397..616406f 100644 --- a/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java +++ b/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java @@ -56,6 +56,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; +import static org.apache.storm.utils.Utils.OR; /** * Class to download and manage files from the blobstore. It uses an LRU cache @@ -345,6 +346,9 @@ public class Localizer { if (lrsrc == null) { LOG.warn("blob requested for update doesn't exist: {}", key); continue; + } else if ((boolean)OR(_conf.get(Config.DISABLE_SYMLINKS), false)) { + LOG.warn("symlinks are disabled so blobs cannot be downloaded."); + continue; } else { // update it if either the version isn't the latest or if any local blob files are missing if (!isLocalizedResourceUpToDate(lrsrc, blobstore) || @@ -400,7 +404,9 @@ public class Localizer { public synchronized List<LocalizedResource> getBlobs(List<LocalResource> localResources, String user, String topo, File userFileDir) throws AuthorizationException, KeyNotFoundException, IOException { - + if ((boolean)OR(_conf.get(Config.DISABLE_SYMLINKS), false)) { + throw new KeyNotFoundException("symlinks are disabled so blobs cannot be downloaded."); + } LocalizedResourceSet newSet = new LocalizedResourceSet(user); LocalizedResourceSet lrsrcSet = _userRsrc.putIfAbsent(user, newSet); if (lrsrcSet == null) { http://git-wip-us.apache.org/repos/asf/storm/blob/f9bfb210/storm-core/src/jvm/org/apache/storm/utils/Utils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java index 5696a43..1a13a6f 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java +++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java @@ -2076,6 +2076,16 @@ public class Utils { } /** + * a or b the first one that is not null + * @param a something + * @param b something else + * @return a or b the first one that is not null + */ + public static <V> V OR(V a, V b) { + return a == null ? b : a; + } + + /** * Writes a posix shell script file to be executed in its own process. * @param dir the directory under which the script is to be written * @param command the command the script is to execute