http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java new file mode 100644 index 0000000..bb90dd8 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import org.apache.thrift.TException; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.display.AngularObject; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.display.Input; +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; +import org.apache.zeppelin.scheduler.Job; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Proxy for Interpreter instance that runs on separate process + */ +public class RemoteInterpreter extends Interpreter { + private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreter.class); + private static final Gson gson = new Gson(); + + + private String className; + private String sessionId; + private String userName; + private FormType formType; + + private RemoteInterpreterProcess interpreterProcess; + private volatile boolean isOpened = false; + private volatile boolean isCreated = false; + + /** + * Remote interpreter and manage interpreter process + */ + public RemoteInterpreter(Properties properties, + String sessionId, + String className, + String userName) { + super(properties); + this.sessionId = sessionId; + this.className = className; + this.userName = userName; + } + + public boolean isOpened() { + return isOpened; + } + + @Override + public String getClassName() { + return className; + } + + public String getSessionId() { + return this.sessionId; + } + + public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() { + if (this.interpreterProcess != null) { + return this.interpreterProcess; + } + InterpreterGroup intpGroup = getInterpreterGroup(); + this.interpreterProcess = intpGroup.getOrCreateInterpreterProcess(); + synchronized (interpreterProcess) { + if (!interpreterProcess.isRunning()) { + interpreterProcess.start(userName, false); + interpreterProcess.getRemoteInterpreterEventPoller() + .setInterpreterProcess(interpreterProcess); + interpreterProcess.getRemoteInterpreterEventPoller().setInterpreterGroup(intpGroup); + interpreterProcess.getRemoteInterpreterEventPoller().start(); + } + } + return interpreterProcess; + } + + @Override + public void open() { + synchronized (this) { + if (!isOpened) { + // create all the interpreters of the same session first, then Open the internal interpreter + // of this RemoteInterpreter. + // The why we we create all the interpreter of the session is because some interpreter + // depends on other interpreter. e.g. PySparkInterpreter depends on SparkInterpreter. + // also see method Interpreter.getInterpreterInTheSameSessionByClassName + for (Interpreter interpreter : getInterpreterGroup().getOrCreateSession( + userName, sessionId)) { + ((RemoteInterpreter) interpreter).internal_create(); + } + + interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + LOGGER.info("Open RemoteInterpreter {}", getClassName()); + client.open(sessionId, className); + // Push angular object loaded from JSON file to remote interpreter + synchronized (getInterpreterGroup()) { + if (!getInterpreterGroup().isAngularRegistryPushed()) { + pushAngularObjectRegistryToRemote(client); + getInterpreterGroup().setAngularRegistryPushed(true); + } + } + return null; + } + }); + isOpened = true; + } + } + } + + private void internal_create() { + synchronized (this) { + if (!isCreated) { + RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); + interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + LOGGER.info("Create RemoteInterpreter {}", getClassName()); + client.createInterpreter(getInterpreterGroup().getId(), sessionId, + className, (Map) property, userName); + return null; + } + }); + isCreated = true; + } + } + } + + + @Override + public void close() { + if (isOpened) { + RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); + interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + client.close(sessionId, className); + return null; + } + }); + } else { + LOGGER.warn("close is called when RemoterInterpreter is not opened for " + className); + } + } + + @Override + public InterpreterResult interpret(final String st, final InterpreterContext context) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("st:\n{}", st); + } + + final FormType form = getFormType(); + RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); + InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess + .getInterpreterContextRunnerPool(); + List<InterpreterContextRunner> runners = context.getRunners(); + if (runners != null && runners.size() != 0) { + // assume all runners in this InterpreterContext have the same note id + String noteId = runners.get(0).getNoteId(); + + interpreterContextRunnerPool.clear(noteId); + interpreterContextRunnerPool.addAll(noteId, runners); + } + return interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<InterpreterResult>() { + @Override + public InterpreterResult call(Client client) throws Exception { + + RemoteInterpreterResult remoteResult = client.interpret( + sessionId, className, st, convert(context)); + Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson( + remoteResult.getConfig(), new TypeToken<Map<String, Object>>() { + }.getType()); + context.getConfig().clear(); + context.getConfig().putAll(remoteConfig); + GUI currentGUI = context.getGui(); + if (form == FormType.NATIVE) { + GUI remoteGui = GUI.fromJson(remoteResult.getGui()); + currentGUI.clear(); + currentGUI.setParams(remoteGui.getParams()); + currentGUI.setForms(remoteGui.getForms()); + } else if (form == FormType.SIMPLE) { + final Map<String, Input> currentForms = currentGUI.getForms(); + final Map<String, Object> currentParams = currentGUI.getParams(); + final GUI remoteGUI = GUI.fromJson(remoteResult.getGui()); + final Map<String, Input> remoteForms = remoteGUI.getForms(); + final Map<String, Object> remoteParams = remoteGUI.getParams(); + currentForms.putAll(remoteForms); + currentParams.putAll(remoteParams); + } + + InterpreterResult result = convert(remoteResult); + return result; + } + } + ); + + } + + @Override + public void cancel(final InterpreterContext context) { + if (!isOpened) { + LOGGER.warn("Cancel is called when RemoterInterpreter is not opened for " + className); + return; + } + RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); + interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + client.cancel(sessionId, className, convert(context)); + return null; + } + }); + } + + @Override + public FormType getFormType() { + if (formType != null) { + return formType; + } + + // it is possible to call getFormType before it is opened + synchronized (this) { + if (!isOpened) { + open(); + } + } + RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); + FormType type = interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<FormType>() { + @Override + public FormType call(Client client) throws Exception { + formType = FormType.valueOf(client.getFormType(sessionId, className)); + return formType; + } + }); + return type; + } + + @Override + public int getProgress(final InterpreterContext context) { + if (!isOpened) { + LOGGER.warn("getProgress is called when RemoterInterpreter is not opened for " + className); + return 0; + } + RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); + return interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<Integer>() { + @Override + public Integer call(Client client) throws Exception { + return client.getProgress(sessionId, className, convert(context)); + } + }); + } + + + @Override + public List<InterpreterCompletion> completion(final String buf, final int cursor, + final InterpreterContext interpreterContext) { + if (!isOpened) { + LOGGER.warn("completion is called when RemoterInterpreter is not opened for " + className); + return new ArrayList<>(); + } + RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); + return interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<List<InterpreterCompletion>>() { + @Override + public List<InterpreterCompletion> call(Client client) throws Exception { + return client.completion(sessionId, className, buf, cursor, + convert(interpreterContext)); + } + }); + } + + public String getStatus(final String jobId) { + if (!isOpened) { + LOGGER.warn("getStatus is called when RemoteInterpreter is not opened for " + className); + return Job.Status.UNKNOWN.name(); + } + RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); + return interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<String>() { + @Override + public String call(Client client) throws Exception { + return client.getStatus(sessionId, jobId); + } + }); + } + + //TODO(zjffdu) Share the Scheduler in the same session or in the same InterpreterGroup ? + @Override + public Scheduler getScheduler() { + int maxConcurrency = Integer.parseInt( + property.getProperty("zeppelin.interpreter.max.poolsize", + ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE.getIntValue() + "")); + return SchedulerFactory.singleton().createOrGetRemoteScheduler( + RemoteInterpreter.class.getName() + "-" + sessionId, + sessionId, this, maxConcurrency); + } + + private RemoteInterpreterContext convert(InterpreterContext ic) { + return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), ic.getReplName(), + ic.getParagraphTitle(), ic.getParagraphText(), gson.toJson(ic.getAuthenticationInfo()), + gson.toJson(ic.getConfig()), gson.toJson(ic.getGui()), gson.toJson(ic.getRunners())); + } + + private InterpreterResult convert(RemoteInterpreterResult result) { + InterpreterResult r = new InterpreterResult( + InterpreterResult.Code.valueOf(result.getCode())); + + for (RemoteInterpreterResultMessage m : result.getMsg()) { + r.add(InterpreterResult.Type.valueOf(m.getType()), m.getData()); + } + + return r; + } + + /** + * Push local angular object registry to + * remote interpreter. This method should be + * call ONLY once when the first Interpreter is created + */ + private void pushAngularObjectRegistryToRemote(Client client) throws TException { + final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup() + .getAngularObjectRegistry(); + if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) { + final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry + .getRegistry(); + LOGGER.info("Push local angular object registry from ZeppelinServer to" + + " remote interpreter group {}", this.getInterpreterGroup().getId()); + final java.lang.reflect.Type registryType = new TypeToken<Map<String, + Map<String, AngularObject>>>() { + }.getType(); + client.angularRegistryPush(gson.toJson(registry, registryType)); + } + } + + @Override + public String toString() { + return "RemoteInterpreter_" + className + "_" + sessionId; + } +}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index 26c9d79..f3bce2f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -29,6 +29,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; import org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner; import org.apache.zeppelin.resource.Resource; @@ -38,6 +39,7 @@ import org.apache.zeppelin.resource.ResourceSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.nio.ByteBuffer; @@ -84,7 +86,6 @@ public class RemoteInterpreterEventPoller extends Thread { @Override public void run() { - Client client = null; AppendOutputRunner runner = new AppendOutputRunner(listener); ScheduledFuture<?> appendFuture = appendService.scheduleWithFixedDelay( runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS); @@ -100,26 +101,14 @@ public class RemoteInterpreterEventPoller extends Thread { continue; } - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - logger.error("Can't get RemoteInterpreterEvent", e1); - waitQuietly(); - continue; - } - - RemoteInterpreterEvent event = null; - boolean broken = false; - try { - event = client.getEvent(); - } catch (TException e) { - broken = true; - logger.error("Can't get RemoteInterpreterEvent", e); - waitQuietly(); - continue; - } finally { - interpreterProcess.releaseClient(client, broken); - } + RemoteInterpreterEvent event = interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<RemoteInterpreterEvent>() { + @Override + public RemoteInterpreterEvent call(Client client) throws Exception { + return client.getEvent(); + } + } + ); AngularObjectRegistry angularObjectRegistry = interpreterGroup.getAngularObjectRegistry(); @@ -286,10 +275,7 @@ public class RemoteInterpreterEventPoller extends Thread { boolean broken = false; final Gson gson = new Gson(); final String eventOwnerKey = reqResourceBody.getOwnerKey(); - Client interpreterServerMain = null; try { - interpreterServerMain = interpreterProcess.getClient(); - final Client eventClient = interpreterServerMain; if (resourceType == RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS) { final List<ZeppelinServerResourceParagraphRunner> remoteRunners = new LinkedList<>(); @@ -308,7 +294,6 @@ public class RemoteInterpreterEventPoller extends Thread { @Override public void onFinished(Object resultObject) { - boolean clientBroken = false; if (resultObject != null && resultObject instanceof List) { List<InterpreterContextRunner> runnerList = (List<InterpreterContextRunner>) resultObject; @@ -324,15 +309,15 @@ public class RemoteInterpreterEventPoller extends Thread { resResource.setResourceType(RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS); resResource.setData(remoteRunners); - try { - eventClient.onReceivedZeppelinResource(resResource.toJson()); - } catch (Exception e) { - clientBroken = true; - logger.error("Can't get RemoteInterpreterEvent", e); - waitQuietly(); - } finally { - interpreterProcess.releaseClient(eventClient, clientBroken); - } + interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + client.onReceivedZeppelinResource(resResource.toJson()); + return null; + } + } + ); } } @@ -346,39 +331,32 @@ public class RemoteInterpreterEventPoller extends Thread { reqRunnerContext.getNoteId(), reqRunnerContext.getParagraphId(), callBackEvent); } } catch (Exception e) { - broken = true; logger.error("Can't get RemoteInterpreterEvent", e); waitQuietly(); - } finally { - interpreterProcess.releaseClient(interpreterServerMain, broken); } } - private void sendResourcePoolResponseGetAll(ResourceSet resourceSet) { - Client client = null; - boolean broken = false; - try { - client = interpreterProcess.getClient(); - List<String> resourceList = new LinkedList<>(); - Gson gson = new Gson(); - for (Resource r : resourceSet) { - resourceList.add(gson.toJson(r)); - } - client.resourcePoolResponseGetAll(resourceList); - } catch (Exception e) { - logger.error(e.getMessage(), e); - broken = true; - } finally { - if (client != null) { - interpreterProcess.releaseClient(client, broken); - } - } + private void sendResourcePoolResponseGetAll(final ResourceSet resourceSet) { + interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + List<String> resourceList = new LinkedList<>(); + for (Resource r : resourceSet) { + resourceList.add(r.toJson()); + } + client.resourcePoolResponseGetAll(resourceList); + return null; + } + } + ); } private ResourceSet getAllResourcePoolExcept() { ResourceSet resourceSet = new ResourceSet(); - for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) { + for (InterpreterGroup intpGroup : interpreterGroup.getInterpreterSetting() + .getInterpreterSettingManager().getAllInterpreterGroup()) { if (intpGroup.getId().equals(interpreterGroup.getId())) { continue; } @@ -390,115 +368,94 @@ public class RemoteInterpreterEventPoller extends Thread { resourceSet.addAll(localPool.getAll()); } } else if (interpreterProcess.isRunning()) { - Client client = null; - boolean broken = false; - try { - client = remoteInterpreterProcess.getClient(); - List<String> resourceList = client.resourcePoolGetAll(); - Gson gson = new Gson(); - for (String res : resourceList) { - resourceSet.add(Resource.fromJson(res)); - } - } catch (Exception e) { - logger.error(e.getMessage(), e); - broken = true; - } finally { - if (client != null) { - intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken); - } + List<String> resourceList = remoteInterpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<List<String>>() { + @Override + public List<String> call(Client client) throws Exception { + return client.resourcePoolGetAll(); + } + } + ); + for (String res : resourceList) { + resourceSet.add(Resource.fromJson(res)); } } } return resourceSet; } - private void sendResourceResponseGet(ResourceId resourceId, Object o) { - Client client = null; - boolean broken = false; - try { - client = interpreterProcess.getClient(); - Gson gson = new Gson(); - String rid = gson.toJson(resourceId); - ByteBuffer obj; - if (o == null) { - obj = ByteBuffer.allocate(0); - } else { - obj = Resource.serializeObject(o); - } - client.resourceResponseGet(rid, obj); - } catch (Exception e) { - logger.error(e.getMessage(), e); - broken = true; - } finally { - if (client != null) { - interpreterProcess.releaseClient(client, broken); - } - } + private void sendResourceResponseGet(final ResourceId resourceId, final Object o) { + interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + String rid = resourceId.toJson(); + ByteBuffer obj; + if (o == null) { + obj = ByteBuffer.allocate(0); + } else { + obj = Resource.serializeObject(o); + } + client.resourceResponseGet(rid, obj); + return null; + } + } + ); } - private Object getResource(ResourceId resourceId) { - InterpreterGroup intpGroup = InterpreterGroup.getByInterpreterGroupId( - resourceId.getResourcePoolId()); + private Object getResource(final ResourceId resourceId) { + InterpreterGroup intpGroup = interpreterGroup.getInterpreterSetting() + .getInterpreterSettingManager() + .getInterpreterGroupById(resourceId.getResourcePoolId()); if (intpGroup == null) { return null; } RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess(); - if (remoteInterpreterProcess == null) { - ResourcePool localPool = intpGroup.getResourcePool(); - if (localPool != null) { - return localPool.get(resourceId.getName()); - } - } else if (interpreterProcess.isRunning()) { - Client client = null; - boolean broken = false; - try { - client = remoteInterpreterProcess.getClient(); - ByteBuffer res = client.resourceGet( - resourceId.getNoteId(), - resourceId.getParagraphId(), - resourceId.getName()); - Object o = Resource.deserializeObject(res); - return o; - } catch (Exception e) { - logger.error(e.getMessage(), e); - broken = true; - } finally { - if (client != null) { - intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken); + ByteBuffer buffer = remoteInterpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() { + @Override + public ByteBuffer call(Client client) throws Exception { + return client.resourceGet( + resourceId.getNoteId(), + resourceId.getParagraphId(), + resourceId.getName()); + } } - } - } - return null; - } + ); - public void sendInvokeMethodResult(InvokeResourceMethodEventMessage message, Object o) { - Client client = null; - boolean broken = false; try { - client = interpreterProcess.getClient(); - Gson gson = new Gson(); - String invokeMessage = gson.toJson(message); - ByteBuffer obj; - if (o == null) { - obj = ByteBuffer.allocate(0); - } else { - obj = Resource.serializeObject(o); - } - client.resourceResponseInvokeMethod(invokeMessage, obj); + Object o = Resource.deserializeObject(buffer); + return o; } catch (Exception e) { logger.error(e.getMessage(), e); - broken = true; - } finally { - if (client != null) { - interpreterProcess.releaseClient(client, broken); - } } + return null; } - private Object invokeResourceMethod(InvokeResourceMethodEventMessage message) { - ResourceId resourceId = message.resourceId; - InterpreterGroup intpGroup = InterpreterGroup.getByInterpreterGroupId( - resourceId.getResourcePoolId()); + public void sendInvokeMethodResult(final InvokeResourceMethodEventMessage message, + final Object o) { + interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + String invokeMessage = message.toJson(); + ByteBuffer obj; + if (o == null) { + obj = ByteBuffer.allocate(0); + } else { + obj = Resource.serializeObject(o); + } + client.resourceResponseInvokeMethod(invokeMessage, obj); + return null; + } + } + ); + } + + private Object invokeResourceMethod(final InvokeResourceMethodEventMessage message) { + final ResourceId resourceId = message.resourceId; + InterpreterGroup intpGroup = interpreterGroup.getInterpreterSetting() + .getInterpreterSettingManager().getInterpreterGroupById(resourceId.getResourcePoolId()); if (intpGroup == null) { return null; } @@ -529,25 +486,25 @@ public class RemoteInterpreterEventPoller extends Thread { return null; } } else if (interpreterProcess.isRunning()) { - Client client = null; - boolean broken = false; + ByteBuffer res = interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() { + @Override + public ByteBuffer call(Client client) throws Exception { + return client.resourceInvokeMethod( + resourceId.getNoteId(), + resourceId.getParagraphId(), + resourceId.getName(), + message.toJson()); + } + } + ); + try { - client = remoteInterpreterProcess.getClient(); - ByteBuffer res = client.resourceInvokeMethod( - resourceId.getNoteId(), - resourceId.getParagraphId(), - resourceId.getName(), - gson.toJson(message)); - Object o = Resource.deserializeObject(res); - return o; + return Resource.deserializeObject(res); } catch (Exception e) { logger.error(e.getMessage(), e); - broken = true; - } finally { - if (client != null) { - intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken); - } } + return null; } return null; } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java new file mode 100644 index 0000000..19356fb --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import org.apache.commons.exec.*; +import org.apache.commons.exec.environment.EnvironmentUtils; +import org.apache.zeppelin.helium.ApplicationEventListener; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; + +/** + * This class manages start / stop of remote interpreter process + */ +public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess + implements ExecuteResultHandler { + private static final Logger logger = LoggerFactory.getLogger( + RemoteInterpreterManagedProcess.class); + private final String interpreterRunner; + + private DefaultExecutor executor; + private ExecuteWatchdog watchdog; + boolean running = false; + private int port = -1; + private final String interpreterDir; + private final String localRepoDir; + private final String interpreterGroupName; + + private Map<String, String> env; + + public RemoteInterpreterManagedProcess( + String intpRunner, + String intpDir, + String localRepoDir, + Map<String, String> env, + int connectTimeout, + RemoteInterpreterProcessListener listener, + ApplicationEventListener appListener, + String interpreterGroupName) { + super(new RemoteInterpreterEventPoller(listener, appListener), + connectTimeout); + this.interpreterRunner = intpRunner; + this.env = env; + this.interpreterDir = intpDir; + this.localRepoDir = localRepoDir; + this.interpreterGroupName = interpreterGroupName; + } + + RemoteInterpreterManagedProcess(String intpRunner, + String intpDir, + String localRepoDir, + Map<String, String> env, + RemoteInterpreterEventPoller remoteInterpreterEventPoller, + int connectTimeout, + String interpreterGroupName) { + super(remoteInterpreterEventPoller, + connectTimeout); + this.interpreterRunner = intpRunner; + this.env = env; + this.interpreterDir = intpDir; + this.localRepoDir = localRepoDir; + this.interpreterGroupName = interpreterGroupName; + } + + @Override + public String getHost() { + return "localhost"; + } + + @Override + public int getPort() { + return port; + } + + @Override + public void start(String userName, Boolean isUserImpersonate) { + // start server process + try { + port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); + logger.info("Choose port {} for RemoteInterpreterProcess", port); + } catch (IOException e1) { + throw new InterpreterException(e1); + } + + CommandLine cmdLine = CommandLine.parse(interpreterRunner); + cmdLine.addArgument("-d", false); + cmdLine.addArgument(interpreterDir, false); + cmdLine.addArgument("-p", false); + cmdLine.addArgument(Integer.toString(port), false); + if (isUserImpersonate && !userName.equals("anonymous")) { + cmdLine.addArgument("-u", false); + cmdLine.addArgument(userName, false); + } + cmdLine.addArgument("-l", false); + cmdLine.addArgument(localRepoDir, false); + cmdLine.addArgument("-g", false); + cmdLine.addArgument(interpreterGroupName, false); + + executor = new DefaultExecutor(); + + ByteArrayOutputStream cmdOut = new ByteArrayOutputStream(); + ProcessLogOutputStream processOutput = new ProcessLogOutputStream(logger); + processOutput.setOutputStream(cmdOut); + + executor.setStreamHandler(new PumpStreamHandler(processOutput)); + watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT); + executor.setWatchdog(watchdog); + + try { + Map procEnv = EnvironmentUtils.getProcEnvironment(); + procEnv.putAll(env); + + logger.info("Run interpreter process {}", cmdLine); + executor.execute(cmdLine, procEnv, this); + running = true; + } catch (IOException e) { + running = false; + throw new InterpreterException(e); + } + + + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < getConnectTimeout()) { + if (!running) { + try { + cmdOut.flush(); + } catch (IOException e) { + // nothing to do + } + throw new InterpreterException(new String(cmdOut.toByteArray())); + } + + try { + if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) { + break; + } else { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + logger.error("Exception in RemoteInterpreterProcess while synchronized reference " + + "Thread.sleep", e); + } + } + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug("Remote interpreter not yet accessible at localhost:" + port); + } + } + } + processOutput.setOutputStream(null); + } + + public void stop() { + if (isRunning()) { + logger.info("kill interpreter process"); + try { + callRemoteFunction(new RemoteFunction<Void>() { + @Override + public Void call(RemoteInterpreterService.Client client) throws Exception { + client.shutdown(); + return null; + } + }); + } catch (Exception e) { + logger.warn("ignore the exception when shutting down"); + } + watchdog.destroyProcess(); + } + + executor = null; + watchdog = null; + running = false; + logger.info("Remote process terminated"); + } + + @Override + public void onProcessComplete(int exitValue) { + logger.info("Interpreter process exited {}", exitValue); + running = false; + + } + + @Override + public void onProcessFailed(ExecuteException e) { + logger.info("Interpreter process failed {}", e); + running = false; + } + + public boolean isRunning() { + return running; + } + + private static class ProcessLogOutputStream extends LogOutputStream { + + private Logger logger; + OutputStream out; + + public ProcessLogOutputStream(Logger logger) { + this.logger = logger; + } + + @Override + protected void processLine(String s, int i) { + this.logger.debug(s); + } + + @Override + public void write(byte [] b) throws IOException { + super.write(b); + + if (out != null) { + synchronized (this) { + if (out != null) { + out.write(b); + } + } + } + } + + @Override + public void write(byte [] b, int offset, int len) throws IOException { + super.write(b, offset, len); + + if (out != null) { + synchronized (this) { + if (out != null) { + out.write(b, offset, len); + } + } + } + } + + public void setOutputStream(OutputStream out) { + synchronized (this) { + this.out = out; + } + } + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index 1d48a1e..a78088c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -20,10 +20,13 @@ import com.google.gson.Gson; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.thrift.TException; import org.apache.zeppelin.helium.ApplicationEventListener; +import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; /** @@ -32,9 +35,6 @@ import java.util.concurrent.atomic.AtomicInteger; public abstract class RemoteInterpreterProcess { private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class); - // number of sessions that are attached to this process - private final AtomicInteger referenceCount; - private GenericObjectPool<Client> clientPool; private final RemoteInterpreterEventPoller remoteInterpreterEventPoller; private final InterpreterContextRunnerPool interpreterContextRunnerPool; @@ -46,16 +46,20 @@ public abstract class RemoteInterpreterProcess { ApplicationEventListener appListener) { this(new RemoteInterpreterEventPoller(listener, appListener), connectTimeout); + this.remoteInterpreterEventPoller.setInterpreterProcess(this); } RemoteInterpreterProcess(RemoteInterpreterEventPoller remoteInterpreterEventPoller, int connectTimeout) { this.interpreterContextRunnerPool = new InterpreterContextRunnerPool(); - referenceCount = new AtomicInteger(0); this.remoteInterpreterEventPoller = remoteInterpreterEventPoller; this.connectTimeout = connectTimeout; } + public RemoteInterpreterEventPoller getRemoteInterpreterEventPoller() { + return remoteInterpreterEventPoller; + } + public abstract String getHost(); public abstract int getPort(); public abstract void start(String userName, Boolean isUserImpersonate); @@ -66,37 +70,18 @@ public abstract class RemoteInterpreterProcess { return connectTimeout; } - public int reference(InterpreterGroup interpreterGroup, String userName, - Boolean isUserImpersonate) { - synchronized (referenceCount) { - if (!isRunning()) { - start(userName, isUserImpersonate); - } - - if (clientPool == null) { - clientPool = new GenericObjectPool<>(new ClientFactory(getHost(), getPort())); - clientPool.setTestOnBorrow(true); - - remoteInterpreterEventPoller.setInterpreterGroup(interpreterGroup); - remoteInterpreterEventPoller.setInterpreterProcess(this); - remoteInterpreterEventPoller.start(); - } - return referenceCount.incrementAndGet(); - } - } - - public Client getClient() throws Exception { + public synchronized Client getClient() throws Exception { if (clientPool == null || clientPool.isClosed()) { - return null; + clientPool = new GenericObjectPool<>(new ClientFactory(getHost(), getPort())); } return clientPool.borrowObject(); } - public void releaseClient(Client client) { + private void releaseClient(Client client) { releaseClient(client, false); } - public void releaseClient(Client client, boolean broken) { + private void releaseClient(Client client, boolean broken) { if (broken) { releaseBrokenClient(client); } else { @@ -108,7 +93,7 @@ public abstract class RemoteInterpreterProcess { } } - public void releaseBrokenClient(Client client) { + private void releaseBrokenClient(Client client) { try { clientPool.invalidateObject(client); } catch (Exception e) { @@ -116,90 +101,6 @@ public abstract class RemoteInterpreterProcess { } } - public int dereference() { - synchronized (referenceCount) { - int r = referenceCount.decrementAndGet(); - if (r == 0) { - logger.info("shutdown interpreter process"); - remoteInterpreterEventPoller.shutdown(); - - // first try shutdown - Client client = null; - try { - client = getClient(); - client.shutdown(); - } catch (Exception e) { - // safely ignore exception while client.shutdown() may terminates remote process - logger.info("Exception in RemoteInterpreterProcess while synchronized dereference, can " + - "safely ignore exception while client.shutdown() may terminates remote process"); - logger.debug(e.getMessage(), e); - } finally { - if (client != null) { - // no longer used - releaseBrokenClient(client); - } - } - - clientPool.clear(); - clientPool.close(); - - // wait for some time (connectTimeout) and force kill - // remote process server.serve() loop is not always finishing gracefully - long startTime = System.currentTimeMillis(); - while (System.currentTimeMillis() - startTime < connectTimeout) { - if (this.isRunning()) { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - logger.error("Exception in RemoteInterpreterProcess while synchronized dereference " + - "Thread.sleep", e); - } - } else { - break; - } - } - } - return r; - } - } - - public int referenceCount() { - synchronized (referenceCount) { - return referenceCount.get(); - } - } - - public int getNumActiveClient() { - if (clientPool == null) { - return 0; - } else { - return clientPool.getNumActive(); - } - } - - public int getNumIdleClient() { - if (clientPool == null) { - return 0; - } else { - return clientPool.getNumIdle(); - } - } - - public void setMaxPoolSize(int size) { - if (clientPool != null) { - //Size + 2 for progress poller , cancel operation - clientPool.setMaxTotal(size + 2); - } - } - - public int getMaxPoolSize() { - if (clientPool != null) { - return clientPool.getMaxTotal(); - } else { - return 0; - } - } - /** * Called when angular object is updated in client side to propagate * change to the remote process @@ -239,4 +140,33 @@ public abstract class RemoteInterpreterProcess { public InterpreterContextRunnerPool getInterpreterContextRunnerPool() { return interpreterContextRunnerPool; } + + public <T> T callRemoteFunction(RemoteFunction<T> func) { + Client client = null; + boolean broken = false; + try { + client = getClient(); + if (client != null) { + return func.call(client); + } + } catch (TException e) { + broken = true; + throw new InterpreterException(e); + } catch (Exception e1) { + throw new InterpreterException(e1); + } finally { + if (client != null) { + releaseClient(client, broken); + } + } + return null; + } + + /** + * + * @param <T> + */ + public interface RemoteFunction<T> { + T call(Client client) throws Exception; + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java new file mode 100644 index 0000000..bb176be --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter.remote; + +import org.apache.zeppelin.helium.ApplicationEventListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class connects to existing process + */ +public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess { + private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class); + private final String host; + private final int port; + + public RemoteInterpreterRunningProcess( + int connectTimeout, + RemoteInterpreterProcessListener listener, + ApplicationEventListener appListener, + String host, + int port + ) { + super(connectTimeout, listener, appListener); + this.host = host; + this.port = port; + } + + @Override + public String getHost() { + return host; + } + + @Override + public int getPort() { + return port; + } + + @Override + public void start(String userName, Boolean isUserImpersonate) { + // assume process is externally managed. nothing to do + } + + @Override + public void stop() { + // assume process is externally managed. nothing to do + } + + @Override + public boolean isRunning() { + return RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort()); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 3853468..3d8123e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -106,6 +106,7 @@ public class RemoteInterpreterServer @Override public void shutdown() throws TException { + logger.info("Shutting down..."); eventClient.waitForEventQueueBecomesEmpty(DEFAULT_SHUTDOWN_TIMEOUT); if (interpreterGroup != null) { interpreterGroup.close(); @@ -159,7 +160,7 @@ public class RemoteInterpreterServer } @Override - public void createInterpreter(String interpreterGroupId, String sessionKey, String + public void createInterpreter(String interpreterGroupId, String sessionId, String className, Map<String, String> properties, String userName) throws TException { if (interpreterGroup == null) { interpreterGroup = new InterpreterGroup(interpreterGroupId); @@ -190,20 +191,11 @@ public class RemoteInterpreterServer replClass.getConstructor(new Class[] {Properties.class}); Interpreter repl = constructor.newInstance(p); repl.setClassloaderUrls(new URL[]{}); - - synchronized (interpreterGroup) { - List<Interpreter> interpreters = interpreterGroup.get(sessionKey); - if (interpreters == null) { - interpreters = new LinkedList<>(); - interpreterGroup.put(sessionKey, interpreters); - } - - interpreters.add(new LazyOpenInterpreter(repl)); - } - logger.info("Instantiate interpreter {}", className); repl.setInterpreterGroup(interpreterGroup); repl.setUserName(userName); + + interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(repl), sessionId); } catch (ClassNotFoundException | NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { @@ -237,13 +229,13 @@ public class RemoteInterpreterServer } } - protected Interpreter getInterpreter(String sessionKey, String className) throws TException { + protected Interpreter getInterpreter(String sessionId, String className) throws TException { if (interpreterGroup == null) { throw new TException( new InterpreterException("Interpreter instance " + className + " not created")); } synchronized (interpreterGroup) { - List<Interpreter> interpreters = interpreterGroup.get(sessionKey); + List<Interpreter> interpreters = interpreterGroup.get(sessionId); if (interpreters == null) { throw new TException( new InterpreterException("Interpreter " + className + " not initialized")); @@ -259,19 +251,20 @@ public class RemoteInterpreterServer } @Override - public void open(String noteId, String className) throws TException { - Interpreter intp = getInterpreter(noteId, className); + public void open(String sessionId, String className) throws TException { + logger.info(String.format("Open Interpreter %s for session %s ", className, sessionId)); + Interpreter intp = getInterpreter(sessionId, className); intp.open(); } @Override - public void close(String sessionKey, String className) throws TException { + public void close(String sessionId, String className) throws TException { // unload all applications for (String appId : runningApplications.keySet()) { RunningApplication appInfo = runningApplications.get(appId); // see NoteInterpreterLoader.SHARED_SESSION - if (appInfo.noteId.equals(sessionKey) || sessionKey.equals("shared_session")) { + if (appInfo.noteId.equals(sessionId) || sessionId.equals("shared_session")) { try { logger.info("Unload App {} ", appInfo.pkg.getName()); appInfo.app.unload(); @@ -286,7 +279,7 @@ public class RemoteInterpreterServer // close interpreters List<Interpreter> interpreters; synchronized (interpreterGroup) { - interpreters = interpreterGroup.get(sessionKey); + interpreters = interpreterGroup.get(sessionId); } if (interpreters != null) { Iterator<Interpreter> it = interpreters.iterator(); @@ -322,7 +315,6 @@ public class RemoteInterpreterServer intp, st, context); - scheduler.submit(job); while (!job.isTerminated()) { @@ -566,30 +558,34 @@ public class RemoteInterpreterServer } @Override - public int getProgress(String noteId, String className, + public int getProgress(String sessionId, String className, RemoteInterpreterContext interpreterContext) throws TException { Integer manuallyProvidedProgress = progressMap.get(interpreterContext.getParagraphId()); if (manuallyProvidedProgress != null) { return manuallyProvidedProgress; } else { - Interpreter intp = getInterpreter(noteId, className); + Interpreter intp = getInterpreter(sessionId, className); + if (intp == null) { + throw new TException("No interpreter {} existed for session {}".format( + className, sessionId)); + } return intp.getProgress(convert(interpreterContext, null)); } } @Override - public String getFormType(String noteId, String className) throws TException { - Interpreter intp = getInterpreter(noteId, className); + public String getFormType(String sessionId, String className) throws TException { + Interpreter intp = getInterpreter(sessionId, className); return intp.getFormType().toString(); } @Override - public List<InterpreterCompletion> completion(String noteId, + public List<InterpreterCompletion> completion(String sessionId, String className, String buf, int cursor, RemoteInterpreterContext remoteInterpreterContext) throws TException { - Interpreter intp = getInterpreter(noteId, className); + Interpreter intp = getInterpreter(sessionId, className); List completion = intp.completion(buf, cursor, convert(remoteInterpreterContext, null)); return completion; } @@ -766,16 +762,16 @@ public class RemoteInterpreterServer } @Override - public String getStatus(String sessionKey, String jobId) + public String getStatus(String sessionId, String jobId) throws TException { if (interpreterGroup == null) { - return "Unknown"; + return Status.UNKNOWN.name(); } synchronized (interpreterGroup) { - List<Interpreter> interpreters = interpreterGroup.get(sessionKey); + List<Interpreter> interpreters = interpreterGroup.get(sessionId); if (interpreters == null) { - return "Unknown"; + return Status.UNKNOWN.name(); } for (Interpreter intp : interpreters) { @@ -792,7 +788,7 @@ public class RemoteInterpreterServer } } } - return "Unknown"; + return Status.UNKNOWN.name(); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java deleted file mode 100644 index b26995a..0000000 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.resource; - -import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; -import org.slf4j.Logger; - -import java.util.List; - -/** - * Utilities for ResourcePool - */ -public class ResourcePoolUtils { - static Logger logger = org.slf4j.LoggerFactory.getLogger(ResourcePoolUtils.class); - - public static ResourceSet getAllResources() { - return getAllResourcesExcept(null); - } - - public static ResourceSet getAllResourcesExcept(String interpreterGroupExcludsion) { - ResourceSet resourceSet = new ResourceSet(); - for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) { - if (interpreterGroupExcludsion != null && - intpGroup.getId().equals(interpreterGroupExcludsion)) { - continue; - } - - RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess(); - if (remoteInterpreterProcess == null) { - ResourcePool localPool = intpGroup.getResourcePool(); - if (localPool != null) { - resourceSet.addAll(localPool.getAll()); - } - } else if (remoteInterpreterProcess.isRunning()) { - RemoteInterpreterService.Client client = null; - boolean broken = false; - try { - client = remoteInterpreterProcess.getClient(); - if (client == null) { - // remote interpreter may not started yet or terminated. - continue; - } - List<String> resourceList = client.resourcePoolGetAll(); - for (String res : resourceList) { - resourceSet.add(Resource.fromJson(res)); - } - } catch (Exception e) { - logger.error(e.getMessage(), e); - broken = true; - } finally { - if (client != null) { - intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken); - } - } - } - } - return resourceSet; - } - - public static void removeResourcesBelongsToNote(String noteId) { - removeResourcesBelongsToParagraph(noteId, null); - } - - public static void removeResourcesBelongsToParagraph(String noteId, String paragraphId) { - for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) { - ResourceSet resourceSet = new ResourceSet(); - RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess(); - if (remoteInterpreterProcess == null) { - ResourcePool localPool = intpGroup.getResourcePool(); - if (localPool != null) { - resourceSet.addAll(localPool.getAll()); - } - if (noteId != null) { - resourceSet = resourceSet.filterByNoteId(noteId); - } - if (paragraphId != null) { - resourceSet = resourceSet.filterByParagraphId(paragraphId); - } - - for (Resource r : resourceSet) { - localPool.remove( - r.getResourceId().getNoteId(), - r.getResourceId().getParagraphId(), - r.getResourceId().getName()); - } - } else if (remoteInterpreterProcess.isRunning()) { - RemoteInterpreterService.Client client = null; - boolean broken = false; - try { - client = remoteInterpreterProcess.getClient(); - List<String> resourceList = client.resourcePoolGetAll(); - for (String res : resourceList) { - resourceSet.add(Resource.fromJson(res)); - } - - if (noteId != null) { - resourceSet = resourceSet.filterByNoteId(noteId); - } - if (paragraphId != null) { - resourceSet = resourceSet.filterByParagraphId(paragraphId); - } - - for (Resource r : resourceSet) { - client.resourceRemove( - r.getResourceId().getNoteId(), - r.getResourceId().getParagraphId(), - r.getResourceId().getName()); - } - } catch (Exception e) { - logger.error(e.getMessage(), e); - broken = true; - } finally { - if (client != null) { - intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken); - } - } - } - } - } -} - http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java index d0025d8..191902a 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java @@ -41,6 +41,7 @@ public abstract class Job { /** * Job status. * + * UNKNOWN - Job is not found in remote * READY - Job is not running, ready to run. * PENDING - Job is submitted to scheduler. but not running yet * RUNNING - Job is running. @@ -48,8 +49,8 @@ public abstract class Job { * ERROR - Job finished run. with error * ABORT - Job finished by abort */ - public static enum Status { - READY, PENDING, RUNNING, FINISHED, ERROR, ABORT; + public enum Status { + UNKNOWN, READY, PENDING, RUNNING, FINISHED, ERROR, ABORT; public boolean isReady() { return this == READY; @@ -70,14 +71,14 @@ public abstract class Job { Date dateCreated; Date dateStarted; Date dateFinished; - Status status; + volatile Status status; static Logger LOGGER = LoggerFactory.getLogger(Job.class); transient boolean aborted = false; - private String errorMessage; - private transient Throwable exception; + private volatile String errorMessage; + private transient volatile Throwable exception; private transient JobListener listener; private long progressUpdateIntervalMs; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java index f9ddc4e..e41540b 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java @@ -17,11 +17,9 @@ package org.apache.zeppelin.scheduler; -import org.apache.thrift.TException; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; import org.apache.zeppelin.scheduler.Job.Status; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +32,7 @@ import java.util.concurrent.ExecutorService; /** * RemoteScheduler runs in ZeppelinServer and proxies Scheduler running on RemoteInterpreter + * */ public class RemoteScheduler implements Scheduler { Logger logger = LoggerFactory.getLogger(RemoteScheduler.class); @@ -45,17 +44,17 @@ public class RemoteScheduler implements Scheduler { boolean terminate = false; private String name; private int maxConcurrency; - private final String noteId; - private RemoteInterpreterProcess interpreterProcess; + private final String sessionId; + private RemoteInterpreter remoteInterpreter; - public RemoteScheduler(String name, ExecutorService executor, String noteId, - RemoteInterpreterProcess interpreterProcess, SchedulerListener listener, + public RemoteScheduler(String name, ExecutorService executor, String sessionId, + RemoteInterpreter remoteInterpreter, SchedulerListener listener, int maxConcurrency) { this.name = name; this.executor = executor; this.listener = listener; - this.noteId = noteId; - this.interpreterProcess = interpreterProcess; + this.sessionId = sessionId; + this.remoteInterpreter = remoteInterpreter; this.maxConcurrency = maxConcurrency; } @@ -167,14 +166,15 @@ public class RemoteScheduler implements Scheduler { private long initialPeriodMsec; private long initialPeriodCheckIntervalMsec; private long checkIntervalMsec; - private boolean terminate; + private volatile boolean terminate; private JobListener listener; private Job job; - Status lastStatus; + volatile Status lastStatus; public JobStatusPoller(long initialPeriodMsec, long initialPeriodCheckIntervalMsec, long checkIntervalMsec, Job job, JobListener listener) { + setName("JobStatusPoller-" + job.getId()); this.initialPeriodMsec = initialPeriodMsec; this.initialPeriodCheckIntervalMsec = initialPeriodCheckIntervalMsec; this.checkIntervalMsec = checkIntervalMsec; @@ -209,7 +209,7 @@ public class RemoteScheduler implements Scheduler { } Status newStatus = getStatus(); - if (newStatus == null) { // unknown + if (newStatus == Status.UNKNOWN) { // unknown continue; } @@ -231,7 +231,9 @@ public class RemoteScheduler implements Scheduler { private Status getLastStatus() { if (terminate == true) { - if (lastStatus != Status.FINISHED && + if (job.getErrorMessage() != null) { + return Status.ERROR; + } else if (lastStatus != Status.FINISHED && lastStatus != Status.ERROR && lastStatus != Status.ABORT) { return Status.FINISHED; @@ -239,58 +241,35 @@ public class RemoteScheduler implements Scheduler { return (lastStatus == null) ? Status.FINISHED : lastStatus; } } else { - return (lastStatus == null) ? Status.FINISHED : lastStatus; + return (lastStatus == null) ? Status.UNKNOWN : lastStatus; } } public synchronized Job.Status getStatus() { - if (interpreterProcess.referenceCount() <= 0) { + if (!remoteInterpreter.isOpened()) { return getLastStatus(); } - - Client client; - try { - client = interpreterProcess.getClient(); - } catch (Exception e) { - logger.error("Can't get status information", e); - lastStatus = Status.ERROR; - return Status.ERROR; - } - - boolean broken = false; - try { - String statusStr = client.getStatus(noteId, job.getId()); - if ("Unknown".equals(statusStr)) { - // not found this job in the remote schedulers. - // maybe not submitted, maybe already finished - //Status status = getLastStatus(); - listener.afterStatusChange(job, null, null); - return job.getStatus(); - } - Status status = Status.valueOf(statusStr); - lastStatus = status; - listener.afterStatusChange(job, null, status); - return status; - } catch (TException e) { - broken = true; - logger.error("Can't get status information", e); - lastStatus = Status.ERROR; - return Status.ERROR; - } catch (Exception e) { - logger.error("Unknown status", e); - lastStatus = Status.ERROR; - return Status.ERROR; - } finally { - interpreterProcess.releaseClient(client, broken); + Status status = Status.valueOf(remoteInterpreter.getStatus(job.getId())); + if (status == Status.UNKNOWN) { + // not found this job in the remote schedulers. + // maybe not submitted, maybe already finished + //Status status = getLastStatus(); + listener.afterStatusChange(job, null, null); + return job.getStatus(); } + lastStatus = status; + listener.afterStatusChange(job, null, status); + return status; } } + //TODO(zjffdu) need to refactor the schdule module which is too complicated private class JobRunner implements Runnable, JobListener { + private final Logger logger = LoggerFactory.getLogger(JobRunner.class); private Scheduler scheduler; private Job job; - private boolean jobExecuted; - boolean jobSubmittedRemotely; + private volatile boolean jobExecuted; + volatile boolean jobSubmittedRemotely; public JobRunner(Scheduler scheduler, Job job) { this.scheduler = scheduler; @@ -338,20 +317,22 @@ public class RemoteScheduler implements Scheduler { } // set job status based on result. - Status lastStatus = jobStatusPoller.getStatus(); Object jobResult = job.getReturn(); - if (jobResult != null && jobResult instanceof InterpreterResult) { - if (((InterpreterResult) jobResult).code() == Code.ERROR) { - lastStatus = Status.ERROR; - } - } - if (job.getException() != null) { - lastStatus = Status.ERROR; + if (job.isAborted()) { + job.setStatus(Status.ABORT); + } else if (job.getException() != null) { +// logger.info("Job ABORT, " + job.getId()); + job.setStatus(Status.ERROR); + } else if (jobResult != null && jobResult instanceof InterpreterResult + && ((InterpreterResult) jobResult).code() == Code.ERROR) { +// logger.info("Job Error, " + job.getId()); + job.setStatus(Status.ERROR); + } else { +// logger.info("Job Finished, " + job.getId()); + job.setStatus(Status.FINISHED); } synchronized (queue) { - job.setStatus(lastStatus); - if (listener != null) { listener.jobFinished(scheduler, job); } @@ -374,25 +355,6 @@ public class RemoteScheduler implements Scheduler { @Override public void afterStatusChange(Job job, Status before, Status after) { - if (after == null) { // unknown. maybe before sumitted remotely, maybe already finished. - if (jobExecuted) { - jobSubmittedRemotely = true; - Object jobResult = job.getReturn(); - if (job.isAborted()) { - job.setStatus(Status.ABORT); - } else if (job.getException() != null) { - job.setStatus(Status.ERROR); - } else if (jobResult != null && jobResult instanceof InterpreterResult - && ((InterpreterResult) jobResult).code() == Code.ERROR) { - job.setStatus(Status.ERROR); - } else { - job.setStatus(Status.FINISHED); - } - } - return; - } - - // Update remoteStatus if (jobExecuted == false) { if (after == Status.FINISHED || after == Status.ABORT @@ -402,14 +364,18 @@ public class RemoteScheduler implements Scheduler { return; } else if (after == Status.RUNNING) { jobSubmittedRemotely = true; + job.setStatus(Status.RUNNING); +// logger.info("Job RUNNING, " + job.getId()); } } else { jobSubmittedRemotely = true; } - // status polled by status poller - if (job.getStatus() != after) { - job.setStatus(after); + // only set status when it is RUNNING + // We would set other status based on the interpret result + if (after == Status.RUNNING) { +// logger.info("Job RUNNING, " + job.getId()); + job.setStatus(Status.RUNNING); } } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java index af52dec..5871ca5 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java @@ -24,17 +24,18 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * TODO(moon) : add description. + * Factory class for creating schedulers + * */ public class SchedulerFactory implements SchedulerListener { private static final Logger logger = LoggerFactory.getLogger(SchedulerFactory.class); - ExecutorService executor; - Map<String, Scheduler> schedulers = new LinkedHashMap<>(); + private ExecutorService executor; + private Map<String, Scheduler> schedulers = new LinkedHashMap<>(); private static SchedulerFactory singleton; private static Long singletonLock = new Long(0); @@ -54,17 +55,17 @@ public class SchedulerFactory implements SchedulerListener { return singleton; } - public SchedulerFactory() throws Exception { - executor = ExecutorFactory.singleton().createOrGet("schedulerFactory", 100); + SchedulerFactory() throws Exception { + executor = ExecutorFactory.singleton().createOrGet("SchedulerFactory", 100); } public void destroy() { - ExecutorFactory.singleton().shutdown("schedulerFactory"); + ExecutorFactory.singleton().shutdown("SchedulerFactory"); } public Scheduler createOrGetFIFOScheduler(String name) { synchronized (schedulers) { - if (schedulers.containsKey(name) == false) { + if (!schedulers.containsKey(name)) { Scheduler s = new FIFOScheduler(name, executor, this); schedulers.put(name, s); executor.execute(s); @@ -75,7 +76,7 @@ public class SchedulerFactory implements SchedulerListener { public Scheduler createOrGetParallelScheduler(String name, int maxConcurrency) { synchronized (schedulers) { - if (schedulers.containsKey(name) == false) { + if (!schedulers.containsKey(name)) { Scheduler s = new ParallelScheduler(name, executor, this, maxConcurrency); schedulers.put(name, s); executor.execute(s); @@ -86,17 +87,17 @@ public class SchedulerFactory implements SchedulerListener { public Scheduler createOrGetRemoteScheduler( String name, - String noteId, - RemoteInterpreterProcess interpreterProcess, + String sessionId, + RemoteInterpreter remoteInterpreter, int maxConcurrency) { synchronized (schedulers) { - if (schedulers.containsKey(name) == false) { + if (!schedulers.containsKey(name)) { Scheduler s = new RemoteScheduler( name, executor, - noteId, - interpreterProcess, + sessionId, + remoteInterpreter, this, maxConcurrency); schedulers.put(name, s); @@ -106,38 +107,24 @@ public class SchedulerFactory implements SchedulerListener { } } - public Scheduler removeScheduler(String name) { + public void removeScheduler(String name) { synchronized (schedulers) { Scheduler s = schedulers.remove(name); if (s != null) { s.stop(); } } - return null; - } - - public Collection<Scheduler> listScheduler(String name) { - List<Scheduler> s = new LinkedList<>(); - synchronized (schedulers) { - for (Scheduler ss : schedulers.values()) { - s.add(ss); - } - } - return s; } @Override public void jobStarted(Scheduler scheduler, Job job) { - logger.info("Job " + job.getJobName() + " started by scheduler " + scheduler.getName()); + logger.info("Job " + job.getId() + " started by scheduler " + scheduler.getName()); } @Override public void jobFinished(Scheduler scheduler, Job job) { - logger.info("Job " + job.getJobName() + " finished by scheduler " + scheduler.getName()); + logger.info("Job " + job.getId() + " finished by scheduler " + scheduler.getName()); } - - - } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java index 8673476..1926528 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java @@ -17,7 +17,6 @@ package org.apache.zeppelin.tabledata; import org.apache.zeppelin.resource.Resource; -import org.apache.zeppelin.resource.ResourcePoolUtils; import java.util.Iterator; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java new file mode 100644 index 0000000..14c03a1 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.util; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +/** + * Generate Tiny ID. + */ +public class IdHashes { + private static final char[] DICTIONARY = new char[] {'1', '2', '3', '4', '5', '6', '7', '8', '9', + 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'J', 'K', 'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', + 'W', 'X', 'Y', 'Z'}; + + /** + * encodes the given string into the base of the dictionary provided in the constructor. + * + * @param value the number to encode. + * @return the encoded string. + */ + private static String encode(Long value) { + + List<Character> result = new ArrayList<>(); + BigInteger base = new BigInteger("" + DICTIONARY.length); + int exponent = 1; + BigInteger remaining = new BigInteger(value.toString()); + while (true) { + BigInteger a = base.pow(exponent); // 16^1 = 16 + BigInteger b = remaining.mod(a); // 119 % 16 = 7 | 112 % 256 = 112 + BigInteger c = base.pow(exponent - 1); + BigInteger d = b.divide(c); + + // if d > dictionary.length, we have a problem. but BigInteger doesnt have + // a greater than method :-( hope for the best. theoretically, d is always + // an index of the dictionary! + result.add(DICTIONARY[d.intValue()]); + remaining = remaining.subtract(b); // 119 - 7 = 112 | 112 - 112 = 0 + + // finished? + if (remaining.equals(BigInteger.ZERO)) { + break; + } + + exponent++; + } + + // need to reverse it, since the start of the list contains the least significant values + StringBuffer sb = new StringBuffer(); + for (int i = result.size() - 1; i >= 0; i--) { + sb.append(result.get(i)); + } + return sb.toString(); + } + + public static String generateId() { + return encode(System.currentTimeMillis() + new Random().nextInt()); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java new file mode 100644 index 0000000..6153f49 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.util; + +import org.apache.commons.lang.StringUtils; + +import java.io.IOException; +import java.util.Properties; + +/** + * TODO(moon) : add description. + */ +public class Util { + private static final String PROJECT_PROPERTIES_VERSION_KEY = "version"; + private static final String GIT_PROPERTIES_COMMIT_ID_KEY = "git.commit.id.abbrev"; + private static final String GIT_PROPERTIES_COMMIT_TS_KEY = "git.commit.time"; + + private static Properties projectProperties; + private static Properties gitProperties; + + static { + projectProperties = new Properties(); + gitProperties = new Properties(); + try { + projectProperties.load(Util.class.getResourceAsStream("/project.properties")); + gitProperties.load(Util.class.getResourceAsStream("/git.properties")); + } catch (IOException e) { + //Fail to read project.properties + } + } + + /** + * Get Zeppelin version + * + * @return Current Zeppelin version + */ + public static String getVersion() { + return StringUtils.defaultIfEmpty(projectProperties.getProperty(PROJECT_PROPERTIES_VERSION_KEY), + StringUtils.EMPTY); + } + + /** + * Get Zeppelin Git latest commit id + * + * @return Latest Zeppelin commit id + */ + public static String getGitCommitId() { + return StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_ID_KEY), + StringUtils.EMPTY); + } + + /** + * Get Zeppelin Git latest commit timestamp + * + * @return Latest Zeppelin commit timestamp + */ + public static String getGitTimestamp() { + return StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_TS_KEY), + StringUtils.EMPTY); + } +}