http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index 9403b4f..f020919 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -17,288 +17,31 @@ package org.apache.zeppelin.interpreter; -import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import org.apache.commons.lang.NullArgumentException; -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; -import org.apache.zeppelin.dep.DependencyResolver; -import org.apache.zeppelin.display.AngularObjectRegistry; -import org.apache.zeppelin.display.AngularObjectRegistryListener; -import org.apache.zeppelin.helium.ApplicationEventListener; -import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.sonatype.aether.RepositoryException; -import java.io.File; -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.net.URL; -import java.net.URLClassLoader; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Properties; /** + * //TODO(zjffdu) considering to move to InterpreterSettingManager + * * Manage interpreters. */ -public class InterpreterFactory implements InterpreterGroupFactory { - private static final Logger logger = LoggerFactory.getLogger(InterpreterFactory.class); - - private Map<String, URLClassLoader> cleanCl = - Collections.synchronizedMap(new HashMap<String, URLClassLoader>()); - - private ZeppelinConfiguration conf; +public class InterpreterFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(InterpreterFactory.class); private final InterpreterSettingManager interpreterSettingManager; - private AngularObjectRegistryListener angularObjectRegistryListener; - private final RemoteInterpreterProcessListener remoteInterpreterProcessListener; - private final ApplicationEventListener appEventListener; - - private boolean shiroEnabled; - - private Map<String, String> env = new HashMap<>(); - - private Interpreter devInterpreter; - - public InterpreterFactory(ZeppelinConfiguration conf, - AngularObjectRegistryListener angularObjectRegistryListener, - RemoteInterpreterProcessListener remoteInterpreterProcessListener, - ApplicationEventListener appEventListener, DependencyResolver depResolver, - boolean shiroEnabled, InterpreterSettingManager interpreterSettingManager) - throws InterpreterException, IOException, RepositoryException { - this.conf = conf; - this.angularObjectRegistryListener = angularObjectRegistryListener; - this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; - this.appEventListener = appEventListener; - this.shiroEnabled = shiroEnabled; + public InterpreterFactory(InterpreterSettingManager interpreterSettingManager) { this.interpreterSettingManager = interpreterSettingManager; - //TODO(jl): Fix it not to use InterpreterGroupFactory - interpreterSettingManager.setInterpreterGroupFactory(this); - - logger.info("shiroEnabled: {}", shiroEnabled); - } - - /** - * @param id interpreterGroup id. Combination of interpreterSettingId + noteId/userId/shared - * depends on interpreter mode - */ - @Override - public InterpreterGroup createInterpreterGroup(String id, InterpreterOption option) - throws InterpreterException, NullArgumentException { - - //When called from REST API without option we receive NPE - if (option == null) { - throw new NullArgumentException("option"); - } - - AngularObjectRegistry angularObjectRegistry; - - InterpreterGroup interpreterGroup = new InterpreterGroup(id); - if (option.isRemote()) { - angularObjectRegistry = - new RemoteAngularObjectRegistry(id, angularObjectRegistryListener, interpreterGroup); - } else { - angularObjectRegistry = new AngularObjectRegistry(id, angularObjectRegistryListener); - - // TODO(moon) : create distributed resource pool for local interpreters and set - } - - interpreterGroup.setAngularObjectRegistry(angularObjectRegistry); - return interpreterGroup; - } - - public void createInterpretersForNote(InterpreterSetting interpreterSetting, String user, - String noteId, String interpreterSessionKey) { - InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(user, noteId); - InterpreterOption option = interpreterSetting.getOption(); - Properties properties = interpreterSetting.getFlatProperties(); - // if interpreters are already there, wait until they're being removed - synchronized (interpreterGroup) { - long interpreterRemovalWaitStart = System.nanoTime(); - // interpreter process supposed to be terminated by RemoteInterpreterProcess.dereference() - // in ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT msec. However, if termination of the process and - // removal from interpreter group take too long, throw an error. - long minTimeout = 10L * 1000 * 1000000; // 10 sec - long interpreterRemovalWaitTimeout = Math.max(minTimeout, - conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT) * 1000000L * 2); - while (interpreterGroup.containsKey(interpreterSessionKey)) { - if (System.nanoTime() - interpreterRemovalWaitStart > interpreterRemovalWaitTimeout) { - throw new InterpreterException("Can not create interpreter"); - } - try { - interpreterGroup.wait(1000); - } catch (InterruptedException e) { - logger.debug(e.getMessage(), e); - } - } - } - - logger.info("Create interpreter instance {} for note {}", interpreterSetting.getName(), noteId); - - List<InterpreterInfo> interpreterInfos = interpreterSetting.getInterpreterInfos(); - String path = interpreterSetting.getPath(); - InterpreterRunner runner = interpreterSetting.getInterpreterRunner(); - Interpreter interpreter; - for (InterpreterInfo info : interpreterInfos) { - if (option.isRemote()) { - if (option.isExistingProcess()) { - interpreter = - connectToRemoteRepl(interpreterSessionKey, info.getClassName(), option.getHost(), - option.getPort(), properties, interpreterSetting.getId(), user, - option.isUserImpersonate); - } else { - interpreter = createRemoteRepl(path, interpreterSessionKey, info.getClassName(), - properties, interpreterSetting.getId(), user, option.isUserImpersonate(), runner); - } - } else { - interpreter = createRepl(interpreterSetting.getPath(), info.getClassName(), properties); - } - - synchronized (interpreterGroup) { - List<Interpreter> interpreters = interpreterGroup.get(interpreterSessionKey); - if (null == interpreters) { - interpreters = new ArrayList<>(); - interpreterGroup.put(interpreterSessionKey, interpreters); - } - if (info.isDefaultInterpreter()) { - interpreters.add(0, interpreter); - } else { - interpreters.add(interpreter); - } - } - logger.info("Interpreter {} {} created", interpreter.getClassName(), interpreter.hashCode()); - interpreter.setInterpreterGroup(interpreterGroup); - } - } - - private Interpreter createRepl(String dirName, String className, Properties property) - throws InterpreterException { - logger.info("Create repl {} from {}", className, dirName); - - ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); - try { - - URLClassLoader ccl = cleanCl.get(dirName); - if (ccl == null) { - // classloader fallback - ccl = URLClassLoader.newInstance(new URL[]{}, oldcl); - } - - boolean separateCL = true; - try { // check if server's classloader has driver already. - Class cls = this.getClass().forName(className); - if (cls != null) { - separateCL = false; - } - } catch (Exception e) { - logger.error("exception checking server classloader driver", e); - } - - URLClassLoader cl; - - if (separateCL == true) { - cl = URLClassLoader.newInstance(new URL[]{}, ccl); - } else { - cl = ccl; - } - Thread.currentThread().setContextClassLoader(cl); - - Class<Interpreter> replClass = (Class<Interpreter>) cl.loadClass(className); - Constructor<Interpreter> constructor = - replClass.getConstructor(new Class[]{Properties.class}); - Interpreter repl = constructor.newInstance(property); - repl.setClassloaderUrls(ccl.getURLs()); - LazyOpenInterpreter intp = new LazyOpenInterpreter(new ClassloaderInterpreter(repl, cl)); - return intp; - } catch (SecurityException e) { - throw new InterpreterException(e); - } catch (NoSuchMethodException e) { - throw new InterpreterException(e); - } catch (IllegalArgumentException e) { - throw new InterpreterException(e); - } catch (InstantiationException e) { - throw new InterpreterException(e); - } catch (IllegalAccessException e) { - throw new InterpreterException(e); - } catch (InvocationTargetException e) { - throw new InterpreterException(e); - } catch (ClassNotFoundException e) { - throw new InterpreterException(e); - } finally { - Thread.currentThread().setContextClassLoader(oldcl); - } - } - - private Interpreter connectToRemoteRepl(String interpreterSessionKey, String className, - String host, int port, Properties property, String interpreterSettingId, String userName, - Boolean isUserImpersonate) { - int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); - int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE); - String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + interpreterSettingId; - LazyOpenInterpreter intp = new LazyOpenInterpreter( - new RemoteInterpreter(property, interpreterSessionKey, className, host, port, localRepoPath, - connectTimeout, maxPoolSize, remoteInterpreterProcessListener, appEventListener, - userName, isUserImpersonate, conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT))); - return intp; - } - - Interpreter createRemoteRepl(String interpreterPath, String interpreterSessionKey, - String className, Properties property, String interpreterSettingId, - String userName, Boolean isUserImpersonate, InterpreterRunner interpreterRunner) { - int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); - String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + interpreterSettingId; - int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE); - String interpreterRunnerPath; - String interpreterGroupName = interpreterSettingManager.get(interpreterSettingId).getName(); - if (null != interpreterRunner) { - interpreterRunnerPath = interpreterRunner.getPath(); - Path p = Paths.get(interpreterRunnerPath); - if (!p.isAbsolute()) { - interpreterRunnerPath = Joiner.on(File.separator) - .join(interpreterPath, interpreterRunnerPath); - } - } else { - interpreterRunnerPath = conf.getInterpreterRemoteRunnerPath(); - } - - RemoteInterpreter remoteInterpreter = - new RemoteInterpreter(property, interpreterSessionKey, className, - interpreterRunnerPath, interpreterPath, localRepoPath, connectTimeout, maxPoolSize, - remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate, - conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT), interpreterGroupName); - remoteInterpreter.addEnv(env); - - return new LazyOpenInterpreter(remoteInterpreter); - } - - private List<Interpreter> createOrGetInterpreterList(String user, String noteId, - InterpreterSetting setting) { - InterpreterGroup interpreterGroup = setting.getInterpreterGroup(user, noteId); - synchronized (interpreterGroup) { - String interpreterSessionKey = - interpreterSettingManager.getInterpreterSessionKey(user, noteId, setting); - if (!interpreterGroup.containsKey(interpreterSessionKey)) { - createInterpretersForNote(setting, user, noteId, interpreterSessionKey); - } - return interpreterGroup.get(interpreterSessionKey); - } } private InterpreterSetting getInterpreterSettingByGroup(List<InterpreterSetting> settings, String group) { - Preconditions.checkNotNull(group, "group should be not null"); + Preconditions.checkNotNull(group, "group should be not null"); for (InterpreterSetting setting : settings) { if (group.equals(setting.getName())) { return setting; @@ -307,80 +50,41 @@ public class InterpreterFactory implements InterpreterGroupFactory { return null; } - private String getInterpreterClassFromInterpreterSetting(InterpreterSetting setting, - String name) { - Preconditions.checkNotNull(name, "name should be not null"); - - for (InterpreterInfo info : setting.getInterpreterInfos()) { - String infoName = info.getName(); - if (null != info.getName() && name.equals(infoName)) { - return info.getClassName(); - } - } - return null; - } - - private Interpreter getInterpreter(String user, String noteId, InterpreterSetting setting, - String name) { - Preconditions.checkNotNull(noteId, "noteId should be not null"); - Preconditions.checkNotNull(setting, "setting should be not null"); - Preconditions.checkNotNull(name, "name should be not null"); - - String className; - if (null != (className = getInterpreterClassFromInterpreterSetting(setting, name))) { - List<Interpreter> interpreterGroup = createOrGetInterpreterList(user, noteId, setting); - for (Interpreter interpreter : interpreterGroup) { - if (className.equals(interpreter.getClassName())) { - return interpreter; - } - } - } - return null; - } - public Interpreter getInterpreter(String user, String noteId, String replName) { List<InterpreterSetting> settings = interpreterSettingManager.getInterpreterSettings(noteId); InterpreterSetting setting; Interpreter interpreter; if (settings == null || settings.size() == 0) { + LOGGER.error("No interpreter is binded to this note: " + noteId); return null; } - if (replName == null || replName.trim().length() == 0) { - // get default settings (first available) - // TODO(jl): Fix it in case of returning null - InterpreterSetting defaultSettings = interpreterSettingManager - .getDefaultInterpreterSetting(settings); - return createOrGetInterpreterList(user, noteId, defaultSettings).get(0); + if (StringUtils.isBlank(replName)) { + // Get the default interpreter of the first interpreter binding + InterpreterSetting defaultSetting = settings.get(0); + return defaultSetting.getDefaultInterpreter(user, noteId); } String[] replNameSplit = replName.split("\\."); if (replNameSplit.length == 2) { - String group = null; - String name = null; - group = replNameSplit[0]; - name = replNameSplit[1]; - + String group = replNameSplit[0]; + String name = replNameSplit[1]; setting = getInterpreterSettingByGroup(settings, group); - if (null != setting) { - interpreter = getInterpreter(user, noteId, setting, name); - + interpreter = setting.getInterpreter(user, noteId, name); if (null != interpreter) { return interpreter; } } - throw new InterpreterException(replName + " interpreter not found"); } else { // first assume replName is 'name' of interpreter. ('groupName' is ommitted) // search 'name' from first (default) interpreter group // TODO(jl): Handle with noteId to support defaultInterpreter per note. - setting = interpreterSettingManager.getDefaultInterpreterSetting(settings); - - interpreter = getInterpreter(user, noteId, setting, replName); + setting = settings.get(0); + interpreter = setting.getInterpreter(user, noteId, replName); if (null != interpreter) { return interpreter; @@ -391,33 +95,17 @@ public class InterpreterFactory implements InterpreterGroupFactory { setting = getInterpreterSettingByGroup(settings, replName); if (null != setting) { - List<Interpreter> interpreters = createOrGetInterpreterList(user, noteId, setting); - if (null != interpreters) { - return interpreters.get(0); - } + return setting.getDefaultInterpreter(user, noteId); } // Support the legacy way to use it for (InterpreterSetting s : settings) { if (s.getGroup().equals(replName)) { - List<Interpreter> interpreters = createOrGetInterpreterList(user, noteId, s); - if (null != interpreters) { - return interpreters.get(0); - } + return setting.getDefaultInterpreter(user, noteId); } } } - + //TODO(zjffdu) throw InterpreterException instead of return null return null; } - - public Map<String, String> getEnv() { - return env; - } - - public void setEnv(Map<String, String> env) { - this.env = env; - } - - }
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroupFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroupFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroupFactory.java deleted file mode 100644 index 3b9be40..0000000 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroupFactory.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.interpreter; - -import org.apache.commons.lang.NullArgumentException; - -/** - * Created InterpreterGroup - */ -public interface InterpreterGroupFactory { - InterpreterGroup createInterpreterGroup(String interpreterGroupId, InterpreterOption option); -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSaving.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSaving.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSaving.java index ca688dc..d7593d5 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSaving.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSaving.java @@ -19,22 +19,78 @@ package org.apache.zeppelin.interpreter; import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.internal.StringMap; +import org.apache.commons.io.IOUtils; import org.apache.zeppelin.common.JsonSerializable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.sonatype.aether.repository.RemoteRepository; -import java.util.List; -import java.util.Map; +import java.io.BufferedReader; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.PosixFilePermission; +import java.util.*; + +import static java.nio.file.attribute.PosixFilePermission.OWNER_READ; +import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE; /** * */ public class InterpreterInfoSaving implements JsonSerializable { - private static final Gson gson = new GsonBuilder().setPrettyPrinting().create(); + private static final Logger LOGGER = LoggerFactory.getLogger(InterpreterInfoSaving.class); + private static final Gson gson = new GsonBuilder().setPrettyPrinting().create(); + + public Map<String, InterpreterSetting> interpreterSettings = new HashMap<>(); + public Map<String, List<String>> interpreterBindings = new HashMap<>(); + public List<RemoteRepository> interpreterRepositories = new ArrayList<>(); + + public static InterpreterInfoSaving loadFromFile(Path file) throws IOException { + LOGGER.info("Load interpreter setting from file: " + file); + InterpreterInfoSaving infoSaving = null; + try (BufferedReader json = Files.newBufferedReader(file, StandardCharsets.UTF_8)) { + JsonParser jsonParser = new JsonParser(); + JsonObject jsonObject = jsonParser.parse(json).getAsJsonObject(); + infoSaving = InterpreterInfoSaving.fromJson(jsonObject.toString()); - public Map<String, InterpreterSetting> interpreterSettings; - public Map<String, List<String>> interpreterBindings; - public List<RemoteRepository> interpreterRepositories; + if (infoSaving != null && infoSaving.interpreterSettings != null) { + for (InterpreterSetting interpreterSetting : infoSaving.interpreterSettings.values()) { + // Always use separate interpreter process + // While we decided to turn this feature on always (without providing + // enable/disable option on GUI). + // previously created setting should turn this feature on here. + interpreterSetting.getOption().setRemote(true); + interpreterSetting.convertPermissionsFromUsersToOwners( + jsonObject.getAsJsonObject("interpreterSettings") + .getAsJsonObject(interpreterSetting.getId())); + } + } + } + return infoSaving == null ? new InterpreterInfoSaving() : infoSaving; + } + + public void saveToFile(Path file) throws IOException { + if (!Files.exists(file)) { + Files.createFile(file); + try { + Set<PosixFilePermission> permissions = EnumSet.of(OWNER_READ, OWNER_WRITE); + Files.setPosixFilePermissions(file, permissions); + } catch (UnsupportedOperationException e) { + // File system does not support Posix file permissions (likely windows) - continue anyway. + LOGGER.warn("unable to setPosixFilePermissions on '{}'.", file); + }; + } + LOGGER.info("Save Interpreter Settings to " + file); + IOUtils.write(this.toJson(), new FileOutputStream(file.toFile())); + } public String toJson() { return gson.toJson(this); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java index 752b4e2..9f4cfd4 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java @@ -17,8 +17,39 @@ package org.apache.zeppelin.interpreter; -import java.util.Arrays; -import java.util.Collection; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.annotations.SerializedName; +import com.google.gson.internal.StringMap; +import org.apache.commons.io.FileUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.dep.Dependency; +import org.apache.zeppelin.dep.DependencyResolver; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.AngularObjectRegistryListener; +import org.apache.zeppelin.helium.ApplicationEventListener; +import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -26,104 +57,253 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.zeppelin.dep.Dependency; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.annotations.SerializedName; -import com.google.gson.internal.StringMap; - -import static org.apache.zeppelin.notebook.utility.IdHashes.generateId; +import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE; +import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT; +import static org.apache.zeppelin.util.IdHashes.generateId; /** - * Interpreter settings + * Represent one InterpreterSetting in the interpreter setting page */ public class InterpreterSetting { - private static final Logger logger = LoggerFactory.getLogger(InterpreterSetting.class); + private static final Logger LOGGER = LoggerFactory.getLogger(InterpreterSetting.class); private static final String SHARED_PROCESS = "shared_process"; + private static final String SHARED_SESSION = "shared_session"; + private static final Map<String, Object> DEFAULT_EDITOR = ImmutableMap.of( + "language", (Object) "text", + "editOnDblClick", false); + private String id; private String name; - // always be null in case of InterpreterSettingRef + // the original interpreter setting template name where it is created from private String group; - private transient Map<String, String> infos; - - // Map of the note and paragraphs which has runtime infos generated by this interpreter setting. - // This map is used to clear the infos in paragraph when the interpretersetting is restarted - private transient Map<String, Set<String>> runtimeInfosToBeCleared; + //TODO(zjffdu) make the interpreter.json consistent with interpreter-setting.json /** - * properties can be either Map<String, DefaultInterpreterProperty> or - * Map<String, InterpreterProperty> + * properties can be either Properties or Map<String, InterpreterProperty> * properties should be: - * - Map<String, InterpreterProperty> when Interpreter instances are saved to - * `conf/interpreter.json` file - * - Map<String, DefaultInterpreterProperty> when Interpreters are registered + * - Properties when Interpreter instances are saved to `conf/interpreter.json` file + * - Map<String, InterpreterProperty> when Interpreters are registered * : this is needed after https://github.com/apache/zeppelin/pull/1145 * which changed the way of getting default interpreter setting AKA interpreterSettingsRef + * Note(mina): In order to simplify the implementation, I chose to change properties + * from Properties to Object instead of creating new classes. */ - private Object properties; + private Object properties = new Properties(); + private Status status; private String errorReason; @SerializedName("interpreterGroup") private List<InterpreterInfo> interpreterInfos; - private final transient Map<String, InterpreterGroup> interpreterGroupRef = new HashMap<>(); - private List<Dependency> dependencies = new LinkedList<>(); - private InterpreterOption option; - private transient String path; + + private List<Dependency> dependencies = new ArrayList<>(); + private InterpreterOption option = new InterpreterOption(true); @SerializedName("runner") private InterpreterRunner interpreterRunner; - @Deprecated - private transient InterpreterGroupFactory interpreterGroupFactory; + /////////////////////////////////////////////////////////////////////////////////////////// + private transient InterpreterSettingManager interpreterSettingManager; + private transient String interpreterDir; + private final transient Map<String, ManagedInterpreterGroup> interpreterGroups = + new ConcurrentHashMap<>(); private final transient ReentrantReadWriteLock.ReadLock interpreterGroupReadLock; private final transient ReentrantReadWriteLock.WriteLock interpreterGroupWriteLock; + private transient AngularObjectRegistryListener angularObjectRegistryListener; + private transient RemoteInterpreterProcessListener remoteInterpreterProcessListener; + private transient ApplicationEventListener appEventListener; + private transient DependencyResolver dependencyResolver; + + private transient Map<String, String> infos; + + // Map of the note and paragraphs which has runtime infos generated by this interpreter setting. + // This map is used to clear the infos in paragraph when the interpretersetting is restarted + private transient Map<String, Set<String>> runtimeInfosToBeCleared; + + private transient ZeppelinConfiguration conf = new ZeppelinConfiguration(); + + private transient Map<String, URLClassLoader> cleanCl = + Collections.synchronizedMap(new HashMap<String, URLClassLoader>()); + /////////////////////////////////////////////////////////////////////////////////////////// + + + /** + * Builder class for InterpreterSetting + */ + public static class Builder { + private InterpreterSetting interpreterSetting; + + public Builder() { + this.interpreterSetting = new InterpreterSetting(); + } + + public Builder setId(String id) { + interpreterSetting.id = id; + return this; + } + + public Builder setName(String name) { + interpreterSetting.name = name; + return this; + } + + public Builder setGroup(String group) { + interpreterSetting.group = group; + return this; + } + + public Builder setInterpreterInfos(List<InterpreterInfo> interpreterInfos) { + interpreterSetting.interpreterInfos = interpreterInfos; + return this; + } + + public Builder setProperties(Object properties) { + interpreterSetting.properties = properties; + return this; + } + + public Builder setOption(InterpreterOption option) { + interpreterSetting.option = option; + return this; + } + + public Builder setInterpreterDir(String interpreterDir) { + interpreterSetting.interpreterDir = interpreterDir; + return this; + } + + public Builder setRunner(InterpreterRunner runner) { + interpreterSetting.interpreterRunner = runner; + return this; + } + + public Builder setDependencies(List<Dependency> dependencies) { + interpreterSetting.dependencies = dependencies; + return this; + } + + public Builder setConf(ZeppelinConfiguration conf) { + interpreterSetting.conf = conf; + return this; + } + + public Builder setDependencyResolver(DependencyResolver dependencyResolver) { + interpreterSetting.dependencyResolver = dependencyResolver; + return this; + } + +// public Builder setInterpreterRunner(InterpreterRunner runner) { +// interpreterSetting.interpreterRunner = runner; +// return this; +// } + + public Builder setIntepreterSettingManager( + InterpreterSettingManager interpreterSettingManager) { + interpreterSetting.interpreterSettingManager = interpreterSettingManager; + return this; + } + + public Builder setRemoteInterpreterProcessListener(RemoteInterpreterProcessListener + remoteInterpreterProcessListener) { + interpreterSetting.remoteInterpreterProcessListener = remoteInterpreterProcessListener; + return this; + } + + public Builder setAngularObjectRegistryListener( + AngularObjectRegistryListener angularObjectRegistryListener) { + interpreterSetting.angularObjectRegistryListener = angularObjectRegistryListener; + return this; + } + + public Builder setApplicationEventListener(ApplicationEventListener applicationEventListener) { + interpreterSetting.appEventListener = applicationEventListener; + return this; + } + + public InterpreterSetting create() { + // post processing + interpreterSetting.postProcessing(); + return interpreterSetting; + } + } + public InterpreterSetting() { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + this.id = generateId(); interpreterGroupReadLock = lock.readLock(); interpreterGroupWriteLock = lock.writeLock(); } - public InterpreterSetting(String id, String name, String group, - List<InterpreterInfo> interpreterInfos, Object properties, List<Dependency> dependencies, - InterpreterOption option, String path, InterpreterRunner runner) { - this(); - this.id = id; - this.name = name; - this.group = group; - this.interpreterInfos = interpreterInfos; - this.properties = properties; - this.dependencies = dependencies; - this.option = option; - this.path = path; + void postProcessing() { this.status = Status.READY; - this.interpreterRunner = runner; - } - - public InterpreterSetting(String name, String group, List<InterpreterInfo> interpreterInfos, - Object properties, List<Dependency> dependencies, InterpreterOption option, String path, - InterpreterRunner runner) { - this(generateId(), name, group, interpreterInfos, properties, dependencies, option, path, - runner); } /** - * Create interpreter from interpreterSettingRef + * Create interpreter from InterpreterSettingTemplate * - * @param o interpreterSetting from interpreterSettingRef + * @param o interpreterSetting from InterpreterSettingTemplate */ public InterpreterSetting(InterpreterSetting o) { - this(generateId(), o.getName(), o.getGroup(), o.getInterpreterInfos(), o.getProperties(), - o.getDependencies(), o.getOption(), o.getPath(), o.getInterpreterRunner()); + this(); + this.id = generateId(); + this.name = o.name; + this.group = o.group; + this.properties = convertInterpreterProperties( + (Map<String, DefaultInterpreterProperty>) o.getProperties()); + this.interpreterInfos = new ArrayList<>(o.getInterpreterInfos()); + this.option = InterpreterOption.fromInterpreterOption(o.getOption()); + this.dependencies = new ArrayList<>(o.getDependencies()); + this.interpreterDir = o.getInterpreterDir(); + this.interpreterRunner = o.getInterpreterRunner(); + this.conf = o.getConf(); + } + + public AngularObjectRegistryListener getAngularObjectRegistryListener() { + return angularObjectRegistryListener; + } + + public RemoteInterpreterProcessListener getRemoteInterpreterProcessListener() { + return remoteInterpreterProcessListener; + } + + public ApplicationEventListener getAppEventListener() { + return appEventListener; + } + + public DependencyResolver getDependencyResolver() { + return dependencyResolver; + } + + public InterpreterSettingManager getInterpreterSettingManager() { + return interpreterSettingManager; + } + + public void setAngularObjectRegistryListener(AngularObjectRegistryListener + angularObjectRegistryListener) { + this.angularObjectRegistryListener = angularObjectRegistryListener; + } + + public void setAppEventListener(ApplicationEventListener appEventListener) { + this.appEventListener = appEventListener; + } + + public void setRemoteInterpreterProcessListener(RemoteInterpreterProcessListener + remoteInterpreterProcessListener) { + this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; + } + + public void setDependencyResolver(DependencyResolver dependencyResolver) { + this.dependencyResolver = dependencyResolver; + } + + public void setInterpreterSettingManager(InterpreterSettingManager interpreterSettingManager) { + this.interpreterSettingManager = interpreterSettingManager; } public String getId() { @@ -138,10 +318,9 @@ public class InterpreterSetting { return group; } - private String getInterpreterProcessKey(String user, String noteId) { - InterpreterOption option = getOption(); + private String getInterpreterGroupId(String user, String noteId) { String key; - if (getOption().isExistingProcess) { + if (option.isExistingProcess) { key = Constants.EXISTING_PROCESS; } else if (getOption().isProcess()) { key = (option.perUserIsolated() ? user : "") + ":" + (option.perNoteIsolated() ? noteId : ""); @@ -149,40 +328,11 @@ public class InterpreterSetting { key = SHARED_PROCESS; } - //logger.debug("getInterpreterProcessKey: {} for InterpreterSetting Id: {}, Name: {}", - // key, getId(), getName()); - return key; + //TODO(zjffdu) we encode interpreter setting id into groupId, this is not a good design + return id + ":" + key; } - private boolean isEqualInterpreterKeyProcessKey(String refKey, String processKey) { - InterpreterOption option = getOption(); - int validCount = 0; - if (getOption().isProcess() - && !(option.perUserIsolated() == true && option.perNoteIsolated() == true)) { - - List<String> processList = Arrays.asList(processKey.split(":")); - List<String> refList = Arrays.asList(refKey.split(":")); - - if (refList.size() <= 1 || processList.size() <= 1) { - return refKey.equals(processKey); - } - - if (processList.get(0).equals("") || processList.get(0).equals(refList.get(0))) { - validCount = validCount + 1; - } - - if (processList.get(1).equals("") || processList.get(1).equals(refList.get(1))) { - validCount = validCount + 1; - } - - return (validCount >= 2); - } else { - return refKey.equals(processKey); - } - } - - String getInterpreterSessionKey(String user, String noteId) { - InterpreterOption option = getOption(); + private String getInterpreterSessionId(String user, String noteId) { String key; if (option.isExistingProcess()) { key = Constants.EXISTING_PROCESS; @@ -193,120 +343,153 @@ public class InterpreterSetting { } else if (option.perNoteScoped()) { key = noteId; } else { - key = "shared_session"; + key = SHARED_SESSION; } - logger.debug("Interpreter session key: {}, for note: {}, user: {}, InterpreterSetting Name: " + - "{}", key, noteId, user, getName()); return key; } - public InterpreterGroup getInterpreterGroup(String user, String noteId) { - String key = getInterpreterProcessKey(user, noteId); - if (!interpreterGroupRef.containsKey(key)) { - String interpreterGroupId = getId() + ":" + key; - InterpreterGroup intpGroup = - interpreterGroupFactory.createInterpreterGroup(interpreterGroupId, getOption()); - + public ManagedInterpreterGroup getOrCreateInterpreterGroup(String user, String noteId) { + String groupId = getInterpreterGroupId(user, noteId); + try { interpreterGroupWriteLock.lock(); - logger.debug("create interpreter group with groupId:" + interpreterGroupId); - interpreterGroupRef.put(key, intpGroup); - interpreterGroupWriteLock.unlock(); + if (!interpreterGroups.containsKey(groupId)) { + LOGGER.info("Create InterpreterGroup with groupId {} for user {} and note {}", + groupId, user, noteId); + ManagedInterpreterGroup intpGroup = createInterpreterGroup(groupId); + interpreterGroups.put(groupId, intpGroup); + } + return interpreterGroups.get(groupId); + } finally { + interpreterGroupWriteLock.unlock();; } + } + + void removeInterpreterGroup(String groupId) { + this.interpreterGroups.remove(groupId); + } + + ManagedInterpreterGroup getInterpreterGroup(String user, String noteId) { + String groupId = getInterpreterGroupId(user, noteId); try { interpreterGroupReadLock.lock(); - return interpreterGroupRef.get(key); + return interpreterGroups.get(groupId); } finally { - interpreterGroupReadLock.unlock(); + interpreterGroupReadLock.unlock();; } } - public Collection<InterpreterGroup> getAllInterpreterGroups() { + ManagedInterpreterGroup getInterpreterGroup(String groupId) { + return interpreterGroups.get(groupId); + } + + @VisibleForTesting + public ArrayList<ManagedInterpreterGroup> getAllInterpreterGroups() { try { interpreterGroupReadLock.lock(); - return new LinkedList<>(interpreterGroupRef.values()); + return new ArrayList(interpreterGroups.values()); } finally { interpreterGroupReadLock.unlock(); } } - void closeAndRemoveInterpreterGroup(String noteId, String user) { - if (user.equals("anonymous")) { - user = ""; - } - String processKey = getInterpreterProcessKey(user, noteId); - String sessionKey = getInterpreterSessionKey(user, noteId); - List<InterpreterGroup> groupToRemove = new LinkedList<>(); - InterpreterGroup groupItem; - for (String intpKey : new HashSet<>(interpreterGroupRef.keySet())) { - if (isEqualInterpreterKeyProcessKey(intpKey, processKey)) { - interpreterGroupWriteLock.lock(); - // TODO(jl): interpreterGroup has two or more sessionKeys inside it. thus we should not - // remove interpreterGroup if it has two or more values. - groupItem = interpreterGroupRef.get(intpKey); - interpreterGroupWriteLock.unlock(); - groupToRemove.add(groupItem); - } - for (InterpreterGroup groupToClose : groupToRemove) { - // TODO(jl): Fix the logic removing session. Now, it's handled into groupToClose.clsose() - groupToClose.close(interpreterGroupRef, intpKey, sessionKey); + Map<String, Object> getEditorFromSettingByClassName(String className) { + for (InterpreterInfo intpInfo : interpreterInfos) { + if (className.equals(intpInfo.getClassName())) { + if (intpInfo.getEditor() == null) { + break; + } + return intpInfo.getEditor(); } - groupToRemove.clear(); } + return DEFAULT_EDITOR; + } - //Remove session because all interpreters in this session are closed - //TODO(jl): Change all code to handle interpreter one by one or all at once - + void closeInterpreters(String user, String noteId) { + ManagedInterpreterGroup interpreterGroup = getInterpreterGroup(user, noteId); + if (interpreterGroup != null) { + String sessionId = getInterpreterSessionId(user, noteId); + interpreterGroup.close(sessionId); + } } - void closeAndRemoveAllInterpreterGroups() { - for (String processKey : new HashSet<>(interpreterGroupRef.keySet())) { - InterpreterGroup interpreterGroup = interpreterGroupRef.get(processKey); - for (String sessionKey : new HashSet<>(interpreterGroup.keySet())) { - interpreterGroup.close(interpreterGroupRef, processKey, sessionKey); - } + public void close() { + LOGGER.info("Close InterpreterSetting: " + name); + for (ManagedInterpreterGroup intpGroup : interpreterGroups.values()) { + intpGroup.close(); } + interpreterGroups.clear(); + this.runtimeInfosToBeCleared = null; + this.infos = null; } - void shutdownAndRemoveAllInterpreterGroups() { - for (InterpreterGroup interpreterGroup : interpreterGroupRef.values()) { - interpreterGroup.shutdown(); + public void setProperties(Object object) { + if (object instanceof StringMap) { + StringMap<String> map = (StringMap) properties; + Properties newProperties = new Properties(); + for (String key : map.keySet()) { + newProperties.put(key, map.get(key)); + } + this.properties = newProperties; + } else { + this.properties = object; } } + public Object getProperties() { return properties; } - public Properties getFlatProperties() { - Properties p = new Properties(); - if (properties != null) { - Map<String, InterpreterProperty> propertyMap = (Map<String, InterpreterProperty>) properties; - for (String key : propertyMap.keySet()) { - InterpreterProperty tmp = propertyMap.get(key); - p.put(tmp.getName() != null ? tmp.getName() : key, - tmp.getValue() != null ? tmp.getValue().toString() : null); - } + @VisibleForTesting + public void setProperty(String name, String value) { + ((Map<String, InterpreterProperty>) properties).put(name, new InterpreterProperty(name, value)); + } + + // This method is supposed to be only called by InterpreterSetting + // but not InterpreterSetting Template + public Properties getJavaProperties() { + Properties jProperties = new Properties(); + Map<String, InterpreterProperty> iProperties = (Map<String, InterpreterProperty>) properties; + for (Map.Entry<String, InterpreterProperty> entry : iProperties.entrySet()) { + jProperties.setProperty(entry.getKey(), entry.getValue().getValue().toString()); + } + + if (!jProperties.containsKey("zeppelin.interpreter.output.limit")) { + jProperties.setProperty("zeppelin.interpreter.output.limit", + conf.getInt(ZEPPELIN_INTERPRETER_OUTPUT_LIMIT) + ""); + } + + if (!jProperties.containsKey("zeppelin.interpreter.max.poolsize")) { + jProperties.setProperty("zeppelin.interpreter.max.poolsize", + conf.getInt(ZEPPELIN_INTERPRETER_MAX_POOL_SIZE) + ""); } - return p; + + String interpreterLocalRepoPath = conf.getInterpreterLocalRepoPath(); + //TODO(zjffdu) change it to interpreterDir/{interpreter_name} + jProperties.setProperty("zeppelin.interpreter.localRepo", + interpreterLocalRepoPath + "/" + id); + return jProperties; + } + + public ZeppelinConfiguration getConf() { + return conf; + } + + public void setConf(ZeppelinConfiguration conf) { + this.conf = conf; } public List<Dependency> getDependencies() { - if (dependencies == null) { - return new LinkedList<>(); - } return dependencies; } public void setDependencies(List<Dependency> dependencies) { this.dependencies = dependencies; + loadInterpreterDependencies(); } public InterpreterOption getOption() { - if (option == null) { - option = new InterpreterOption(); - } - return option; } @@ -314,35 +497,32 @@ public class InterpreterSetting { this.option = option; } - public String getPath() { - return path; + public String getInterpreterDir() { + return interpreterDir; } - public void setPath(String path) { - this.path = path; + public void setInterpreterDir(String interpreterDir) { + this.interpreterDir = interpreterDir; } public List<InterpreterInfo> getInterpreterInfos() { return interpreterInfos; } - void setInterpreterGroupFactory(InterpreterGroupFactory interpreterGroupFactory) { - this.interpreterGroupFactory = interpreterGroupFactory; - } - void appendDependencies(List<Dependency> dependencies) { for (Dependency dependency : dependencies) { if (!this.dependencies.contains(dependency)) { this.dependencies.add(dependency); } } + loadInterpreterDependencies(); } void setInterpreterOption(InterpreterOption interpreterOption) { this.option = interpreterOption; } - public void setProperties(Map<String, InterpreterProperty> p) { + public void setProperties(Properties p) { this.properties = p; } @@ -379,6 +559,10 @@ public class InterpreterSetting { this.errorReason = errorReason; } + public void setInterpreterInfos(List<InterpreterInfo> interpreterInfos) { + this.interpreterInfos = interpreterInfos; + } + public void setInfos(Map<String, String> infos) { this.infos = infos; } @@ -415,7 +599,236 @@ public class InterpreterSetting { runtimeInfosToBeCleared = null; } - // For backward compatibility of interpreter.json format after ZEPPELIN-2654 + + //////////////////////////// IMPORTANT //////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////////////////////////// + // This is the only place to create interpreters. For now we always create multiple interpreter + // together (one session). We don't support to create single interpreter yet. + List<Interpreter> createInterpreters(String user, String sessionId) { + List<Interpreter> interpreters = new ArrayList<>(); + List<InterpreterInfo> interpreterInfos = getInterpreterInfos(); + for (InterpreterInfo info : interpreterInfos) { + Interpreter interpreter = null; + if (option.isRemote()) { + interpreter = new RemoteInterpreter(getJavaProperties(), sessionId, + info.getClassName(), user); + } else { + interpreter = createLocalInterpreter(info.getClassName()); + } + + if (info.isDefaultInterpreter()) { + interpreters.add(0, interpreter); + } else { + interpreters.add(interpreter); + } + LOGGER.info("Interpreter {} created for user: {}, sessionId: {}", + interpreter.getClassName(), user, sessionId); + } + return interpreters; + } + + // Create Interpreter in ZeppelinServer for non-remote mode + private Interpreter createLocalInterpreter(String className) + throws InterpreterException { + LOGGER.info("Create Local Interpreter {} from {}", className, interpreterDir); + + ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); + try { + + URLClassLoader ccl = cleanCl.get(interpreterDir); + if (ccl == null) { + // classloader fallback + ccl = URLClassLoader.newInstance(new URL[]{}, oldcl); + } + + boolean separateCL = true; + try { // check if server's classloader has driver already. + Class cls = this.getClass().forName(className); + if (cls != null) { + separateCL = false; + } + } catch (Exception e) { + LOGGER.error("exception checking server classloader driver", e); + } + + URLClassLoader cl; + + if (separateCL == true) { + cl = URLClassLoader.newInstance(new URL[]{}, ccl); + } else { + cl = ccl; + } + Thread.currentThread().setContextClassLoader(cl); + + Class<Interpreter> replClass = (Class<Interpreter>) cl.loadClass(className); + Constructor<Interpreter> constructor = + replClass.getConstructor(new Class[]{Properties.class}); + Interpreter repl = constructor.newInstance(getJavaProperties()); + repl.setClassloaderUrls(ccl.getURLs()); + LazyOpenInterpreter intp = new LazyOpenInterpreter(new ClassloaderInterpreter(repl, cl)); + return intp; + } catch (SecurityException e) { + throw new InterpreterException(e); + } catch (NoSuchMethodException e) { + throw new InterpreterException(e); + } catch (IllegalArgumentException e) { + throw new InterpreterException(e); + } catch (InstantiationException e) { + throw new InterpreterException(e); + } catch (IllegalAccessException e) { + throw new InterpreterException(e); + } catch (InvocationTargetException e) { + throw new InterpreterException(e); + } catch (ClassNotFoundException e) { + throw new InterpreterException(e); + } finally { + Thread.currentThread().setContextClassLoader(oldcl); + } + } + + RemoteInterpreterProcess createInterpreterProcess() { + RemoteInterpreterProcess remoteInterpreterProcess = null; + int connectTimeout = + conf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); + String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + id; + if (option.isExistingProcess()) { + // TODO(zjffdu) remove the existing process approach seems no one is using this. + // use the existing process + remoteInterpreterProcess = new RemoteInterpreterRunningProcess( + connectTimeout, + remoteInterpreterProcessListener, + appEventListener, + option.getHost(), + option.getPort()); + } else { + // create new remote process + remoteInterpreterProcess = new RemoteInterpreterManagedProcess( + interpreterRunner != null ? interpreterRunner.getPath() : + conf.getInterpreterRemoteRunnerPath(), interpreterDir, localRepoPath, + getEnvFromInterpreterProperty(getJavaProperties()), connectTimeout, + remoteInterpreterProcessListener, appEventListener, group); + } + return remoteInterpreterProcess; + } + + private Map<String, String> getEnvFromInterpreterProperty(Properties property) { + Map<String, String> env = new HashMap<>(); + for (Object key : property.keySet()) { + if (RemoteInterpreterUtils.isEnvString((String) key)) { + env.put((String) key, property.getProperty((String) key)); + } + } + return env; + } + + private List<Interpreter> getOrCreateSession(String user, String noteId) { + ManagedInterpreterGroup interpreterGroup = getOrCreateInterpreterGroup(user, noteId); + Preconditions.checkNotNull(interpreterGroup, "No InterpreterGroup existed for user {}, " + + "noteId {}", user, noteId); + String sessionId = getInterpreterSessionId(user, noteId); + return interpreterGroup.getOrCreateSession(user, sessionId); + } + + public Interpreter getDefaultInterpreter(String user, String noteId) { + return getOrCreateSession(user, noteId).get(0); + } + + public Interpreter getInterpreter(String user, String noteId, String replName) { + Preconditions.checkNotNull(noteId, "noteId should be not null"); + Preconditions.checkNotNull(replName, "replName should be not null"); + + String className = getInterpreterClassFromInterpreterSetting(replName); + if (className == null) { + return null; + } + List<Interpreter> interpreters = getOrCreateSession(user, noteId); + for (Interpreter interpreter : interpreters) { + if (className.equals(interpreter.getClassName())) { + return interpreter; + } + } + return null; + } + + private String getInterpreterClassFromInterpreterSetting(String replName) { + Preconditions.checkNotNull(replName, "replName should be not null"); + + for (InterpreterInfo info : interpreterInfos) { + String infoName = info.getName(); + if (null != info.getName() && replName.equals(infoName)) { + return info.getClassName(); + } + } + return null; + } + + private ManagedInterpreterGroup createInterpreterGroup(String groupId) + throws InterpreterException { + AngularObjectRegistry angularObjectRegistry; + ManagedInterpreterGroup interpreterGroup = new ManagedInterpreterGroup(groupId, this); + if (option.isRemote()) { + angularObjectRegistry = + new RemoteAngularObjectRegistry(groupId, angularObjectRegistryListener, interpreterGroup); + } else { + angularObjectRegistry = new AngularObjectRegistry(id, angularObjectRegistryListener); + // TODO(moon) : create distributed resource pool for local interpreters and set + } + + interpreterGroup.setAngularObjectRegistry(angularObjectRegistry); + return interpreterGroup; + } + + private void loadInterpreterDependencies() { + setStatus(Status.DOWNLOADING_DEPENDENCIES); + setErrorReason(null); + Thread t = new Thread() { + public void run() { + try { + // dependencies to prevent library conflict + File localRepoDir = new File(conf.getInterpreterLocalRepoPath() + "/" + getId()); + if (localRepoDir.exists()) { + try { + FileUtils.forceDelete(localRepoDir); + } catch (FileNotFoundException e) { + LOGGER.info("A file that does not exist cannot be deleted, nothing to worry", e); + } + } + + // load dependencies + List<Dependency> deps = getDependencies(); + if (deps != null) { + for (Dependency d : deps) { + File destDir = new File( + conf.getRelativeDir(ZeppelinConfiguration.ConfVars.ZEPPELIN_DEP_LOCALREPO)); + + if (d.getExclusions() != null) { + dependencyResolver.load(d.getGroupArtifactVersion(), d.getExclusions(), + new File(destDir, id)); + } else { + dependencyResolver + .load(d.getGroupArtifactVersion(), new File(destDir, id)); + } + } + } + + setStatus(Status.READY); + setErrorReason(null); + } catch (Exception e) { + LOGGER.error(String.format("Error while downloading repos for interpreter group : %s," + + " go to interpreter setting page click on edit and save it again to make " + + "this interpreter work properly. : %s", + getGroup(), e.getLocalizedMessage()), e); + setErrorReason(e.getLocalizedMessage()); + setStatus(Status.ERROR); + } + } + }; + + t.start(); + } + + //TODO(zjffdu) ugly code, should not use JsonObject as parameter. not readable public void convertPermissionsFromUsersToOwners(JsonObject jsonObject) { if (jsonObject != null) { JsonObject option = jsonObject.getAsJsonObject("option"); @@ -434,26 +847,56 @@ public class InterpreterSetting { } // For backward compatibility of interpreter.json format after ZEPPELIN-2403 - public void convertFlatPropertiesToPropertiesWithWidgets() { - StringMap newProperties = new StringMap(); + static Map<String, InterpreterProperty> convertInterpreterProperties(Object properties) { if (properties != null && properties instanceof StringMap) { + Map<String, InterpreterProperty> newProperties = new HashMap<>(); StringMap p = (StringMap) properties; - for (Object o : p.entrySet()) { Map.Entry entry = (Map.Entry) o; if (!(entry.getValue() instanceof StringMap)) { - StringMap newProperty = new StringMap(); - newProperty.put("name", entry.getKey()); - newProperty.put("value", entry.getValue()); - newProperty.put("type", InterpreterPropertyType.TEXTAREA.getValue()); + InterpreterProperty newProperty = new InterpreterProperty( + entry.getKey().toString(), + entry.getValue(), + InterpreterPropertyType.STRING.getValue()); newProperties.put(entry.getKey().toString(), newProperty); } else { // already converted - return; + return (Map<String, InterpreterProperty>) properties; } } - - this.properties = newProperties; + return newProperties; + + } else if (properties instanceof Map) { + Map<String, Object> dProperties = + (Map<String, Object>) properties; + Map<String, InterpreterProperty> newProperties = new HashMap<>(); + for (String key : dProperties.keySet()) { + Object value = dProperties.get(key); + if (value instanceof InterpreterProperty) { + return (Map<String, InterpreterProperty>) properties; + } else if (value instanceof StringMap) { + StringMap stringMap = (StringMap) value; + InterpreterProperty newProperty = new InterpreterProperty( + key, + stringMap.get("value"), + stringMap.containsKey("type") ? stringMap.get("type").toString() : "string"); + + newProperties.put(newProperty.getName(), newProperty); + } else if (value instanceof DefaultInterpreterProperty){ + DefaultInterpreterProperty dProperty = (DefaultInterpreterProperty) value; + InterpreterProperty property = new InterpreterProperty( + key, + dProperty.getValue(), + dProperty.getType() != null ? dProperty.getType() : "string" + // in case user forget to specify type in interpreter-setting.json + ); + newProperties.put(key, property); + } else { + throw new RuntimeException("Can not convert this type of property: " + value.getClass()); + } + } + return newProperties; } + throw new RuntimeException("Can not convert this type: " + properties.getClass()); } }