http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
new file mode 100644
index 0000000..bb90dd8
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.thrift.TException;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.display.Input;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.apache.zeppelin.scheduler.Job;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Proxy for Interpreter instance that runs on separate process
+ */
+public class RemoteInterpreter extends Interpreter {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoteInterpreter.class);
+  private static final Gson gson = new Gson();
+
+
+  private String className;
+  private String sessionId;
+  private String userName;
+  private FormType formType;
+
+  private RemoteInterpreterProcess interpreterProcess;
+  private volatile boolean isOpened = false;
+  private volatile boolean isCreated = false;
+
+  /**
+   * Remote interpreter and manage interpreter process
+   */
+  public RemoteInterpreter(Properties properties,
+                           String sessionId,
+                           String className,
+                           String userName) {
+    super(properties);
+    this.sessionId = sessionId;
+    this.className = className;
+    this.userName = userName;
+  }
+
+  public boolean isOpened() {
+    return isOpened;
+  }
+
+  @Override
+  public String getClassName() {
+    return className;
+  }
+
+  public String getSessionId() {
+    return this.sessionId;
+  }
+
+  public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() 
{
+    if (this.interpreterProcess != null) {
+      return this.interpreterProcess;
+    }
+    InterpreterGroup intpGroup = getInterpreterGroup();
+    this.interpreterProcess = intpGroup.getOrCreateInterpreterProcess();
+    synchronized (interpreterProcess) {
+      if (!interpreterProcess.isRunning()) {
+        interpreterProcess.start(userName, false);
+        interpreterProcess.getRemoteInterpreterEventPoller()
+            .setInterpreterProcess(interpreterProcess);
+        
interpreterProcess.getRemoteInterpreterEventPoller().setInterpreterGroup(intpGroup);
+        interpreterProcess.getRemoteInterpreterEventPoller().start();
+      }
+    }
+    return interpreterProcess;
+  }
+
+  @Override
+  public void open() {
+    synchronized (this) {
+      if (!isOpened) {
+        // create all the interpreters of the same session first, then Open 
the internal interpreter
+        // of this RemoteInterpreter.
+        // The why we we create all the interpreter of the session is because 
some interpreter
+        // depends on other interpreter. e.g. PySparkInterpreter depends on 
SparkInterpreter.
+        // also see method 
Interpreter.getInterpreterInTheSameSessionByClassName
+        for (Interpreter interpreter : 
getInterpreterGroup().getOrCreateSession(
+            userName, sessionId)) {
+          ((RemoteInterpreter) interpreter).internal_create();
+        }
+
+        interpreterProcess.callRemoteFunction(new 
RemoteInterpreterProcess.RemoteFunction<Void>() {
+          @Override
+          public Void call(Client client) throws Exception {
+            LOGGER.info("Open RemoteInterpreter {}", getClassName());
+            client.open(sessionId, className);
+            // Push angular object loaded from JSON file to remote interpreter
+            synchronized (getInterpreterGroup()) {
+              if (!getInterpreterGroup().isAngularRegistryPushed()) {
+                pushAngularObjectRegistryToRemote(client);
+                getInterpreterGroup().setAngularRegistryPushed(true);
+              }
+            }
+            return null;
+          }
+        });
+        isOpened = true;
+      }
+    }
+  }
+
+  private void internal_create() {
+    synchronized (this) {
+      if (!isCreated) {
+        RemoteInterpreterProcess interpreterProcess = 
getOrCreateInterpreterProcess();
+        interpreterProcess.callRemoteFunction(new 
RemoteInterpreterProcess.RemoteFunction<Void>() {
+          @Override
+          public Void call(Client client) throws Exception {
+            LOGGER.info("Create RemoteInterpreter {}", getClassName());
+            client.createInterpreter(getInterpreterGroup().getId(), sessionId,
+                className, (Map) property, userName);
+            return null;
+          }
+        });
+        isCreated = true;
+      }
+    }
+  }
+
+
+  @Override
+  public void close() {
+    if (isOpened) {
+      RemoteInterpreterProcess interpreterProcess = 
getOrCreateInterpreterProcess();
+      interpreterProcess.callRemoteFunction(new 
RemoteInterpreterProcess.RemoteFunction<Void>() {
+        @Override
+        public Void call(Client client) throws Exception {
+          client.close(sessionId, className);
+          return null;
+        }
+      });
+    } else {
+      LOGGER.warn("close is called when RemoterInterpreter is not opened for " 
+ className);
+    }
+  }
+
+  @Override
+  public InterpreterResult interpret(final String st, final InterpreterContext 
context) {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("st:\n{}", st);
+    }
+
+    final FormType form = getFormType();
+    RemoteInterpreterProcess interpreterProcess = 
getOrCreateInterpreterProcess();
+    InterpreterContextRunnerPool interpreterContextRunnerPool = 
interpreterProcess
+        .getInterpreterContextRunnerPool();
+    List<InterpreterContextRunner> runners = context.getRunners();
+    if (runners != null && runners.size() != 0) {
+      // assume all runners in this InterpreterContext have the same note id
+      String noteId = runners.get(0).getNoteId();
+
+      interpreterContextRunnerPool.clear(noteId);
+      interpreterContextRunnerPool.addAll(noteId, runners);
+    }
+    return interpreterProcess.callRemoteFunction(
+        new RemoteInterpreterProcess.RemoteFunction<InterpreterResult>() {
+          @Override
+          public InterpreterResult call(Client client) throws Exception {
+
+            RemoteInterpreterResult remoteResult = client.interpret(
+                sessionId, className, st, convert(context));
+            Map<String, Object> remoteConfig = (Map<String, Object>) 
gson.fromJson(
+                remoteResult.getConfig(), new TypeToken<Map<String, Object>>() 
{
+                }.getType());
+            context.getConfig().clear();
+            context.getConfig().putAll(remoteConfig);
+            GUI currentGUI = context.getGui();
+            if (form == FormType.NATIVE) {
+              GUI remoteGui = GUI.fromJson(remoteResult.getGui());
+              currentGUI.clear();
+              currentGUI.setParams(remoteGui.getParams());
+              currentGUI.setForms(remoteGui.getForms());
+            } else if (form == FormType.SIMPLE) {
+              final Map<String, Input> currentForms = currentGUI.getForms();
+              final Map<String, Object> currentParams = currentGUI.getParams();
+              final GUI remoteGUI = GUI.fromJson(remoteResult.getGui());
+              final Map<String, Input> remoteForms = remoteGUI.getForms();
+              final Map<String, Object> remoteParams = remoteGUI.getParams();
+              currentForms.putAll(remoteForms);
+              currentParams.putAll(remoteParams);
+            }
+
+            InterpreterResult result = convert(remoteResult);
+            return result;
+          }
+        }
+    );
+
+  }
+
+  @Override
+  public void cancel(final InterpreterContext context) {
+    if (!isOpened) {
+      LOGGER.warn("Cancel is called when RemoterInterpreter is not opened for 
" + className);
+      return;
+    }
+    RemoteInterpreterProcess interpreterProcess = 
getOrCreateInterpreterProcess();
+    interpreterProcess.callRemoteFunction(new 
RemoteInterpreterProcess.RemoteFunction<Void>() {
+      @Override
+      public Void call(Client client) throws Exception {
+        client.cancel(sessionId, className, convert(context));
+        return null;
+      }
+    });
+  }
+
+  @Override
+  public FormType getFormType() {
+    if (formType != null) {
+      return formType;
+    }
+
+    // it is possible to call getFormType before it is opened
+    synchronized (this) {
+      if (!isOpened) {
+        open();
+      }
+    }
+    RemoteInterpreterProcess interpreterProcess = 
getOrCreateInterpreterProcess();
+    FormType type = interpreterProcess.callRemoteFunction(
+        new RemoteInterpreterProcess.RemoteFunction<FormType>() {
+          @Override
+          public FormType call(Client client) throws Exception {
+            formType = FormType.valueOf(client.getFormType(sessionId, 
className));
+            return formType;
+          }
+        });
+    return type;
+  }
+
+  @Override
+  public int getProgress(final InterpreterContext context) {
+    if (!isOpened) {
+      LOGGER.warn("getProgress is called when RemoterInterpreter is not opened 
for " + className);
+      return 0;
+    }
+    RemoteInterpreterProcess interpreterProcess = 
getOrCreateInterpreterProcess();
+    return interpreterProcess.callRemoteFunction(
+        new RemoteInterpreterProcess.RemoteFunction<Integer>() {
+          @Override
+          public Integer call(Client client) throws Exception {
+            return client.getProgress(sessionId, className, convert(context));
+          }
+        });
+  }
+
+
+  @Override
+  public List<InterpreterCompletion> completion(final String buf, final int 
cursor,
+                                                final InterpreterContext 
interpreterContext) {
+    if (!isOpened) {
+      LOGGER.warn("completion is called when RemoterInterpreter is not opened 
for " + className);
+      return new ArrayList<>();
+    }
+    RemoteInterpreterProcess interpreterProcess = 
getOrCreateInterpreterProcess();
+    return interpreterProcess.callRemoteFunction(
+        new 
RemoteInterpreterProcess.RemoteFunction<List<InterpreterCompletion>>() {
+          @Override
+          public List<InterpreterCompletion> call(Client client) throws 
Exception {
+            return client.completion(sessionId, className, buf, cursor,
+                convert(interpreterContext));
+          }
+        });
+  }
+
+  public String getStatus(final String jobId) {
+    if (!isOpened) {
+      LOGGER.warn("getStatus is called when RemoteInterpreter is not opened 
for " + className);
+      return Job.Status.UNKNOWN.name();
+    }
+    RemoteInterpreterProcess interpreterProcess = 
getOrCreateInterpreterProcess();
+    return interpreterProcess.callRemoteFunction(
+        new RemoteInterpreterProcess.RemoteFunction<String>() {
+          @Override
+          public String call(Client client) throws Exception {
+            return client.getStatus(sessionId, jobId);
+          }
+        });
+  }
+
+  //TODO(zjffdu) Share the Scheduler in the same session or in the same 
InterpreterGroup ?
+  @Override
+  public Scheduler getScheduler() {
+    int maxConcurrency = Integer.parseInt(
+        property.getProperty("zeppelin.interpreter.max.poolsize",
+            
ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE.getIntValue() 
+ ""));
+    return SchedulerFactory.singleton().createOrGetRemoteScheduler(
+        RemoteInterpreter.class.getName() + "-" + sessionId,
+        sessionId, this, maxConcurrency);
+  }
+
+  private RemoteInterpreterContext convert(InterpreterContext ic) {
+    return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), 
ic.getReplName(),
+        ic.getParagraphTitle(), ic.getParagraphText(), 
gson.toJson(ic.getAuthenticationInfo()),
+        gson.toJson(ic.getConfig()), gson.toJson(ic.getGui()), 
gson.toJson(ic.getRunners()));
+  }
+
+  private InterpreterResult convert(RemoteInterpreterResult result) {
+    InterpreterResult r = new InterpreterResult(
+        InterpreterResult.Code.valueOf(result.getCode()));
+
+    for (RemoteInterpreterResultMessage m : result.getMsg()) {
+      r.add(InterpreterResult.Type.valueOf(m.getType()), m.getData());
+    }
+
+    return r;
+  }
+
+  /**
+   * Push local angular object registry to
+   * remote interpreter. This method should be
+   * call ONLY once when the first Interpreter is created
+   */
+  private void pushAngularObjectRegistryToRemote(Client client) throws 
TException {
+    final AngularObjectRegistry angularObjectRegistry = 
this.getInterpreterGroup()
+        .getAngularObjectRegistry();
+    if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() 
!= null) {
+      final Map<String, Map<String, AngularObject>> registry = 
angularObjectRegistry
+          .getRegistry();
+      LOGGER.info("Push local angular object registry from ZeppelinServer to" +
+          " remote interpreter group {}", this.getInterpreterGroup().getId());
+      final java.lang.reflect.Type registryType = new TypeToken<Map<String,
+          Map<String, AngularObject>>>() {
+      }.getType();
+      client.angularRegistryPush(gson.toJson(registry, registryType));
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "RemoteInterpreter_" + className + "_" + sessionId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
index 26c9d79..f3bce2f 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
@@ -29,6 +29,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
 import 
org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner;
 import org.apache.zeppelin.resource.Resource;
@@ -38,6 +39,7 @@ import org.apache.zeppelin.resource.ResourceSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
@@ -84,7 +86,6 @@ public class RemoteInterpreterEventPoller extends Thread {
 
   @Override
   public void run() {
-    Client client = null;
     AppendOutputRunner runner = new AppendOutputRunner(listener);
     ScheduledFuture<?> appendFuture = appendService.scheduleWithFixedDelay(
         runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
@@ -100,26 +101,14 @@ public class RemoteInterpreterEventPoller extends Thread {
         continue;
       }
 
-      try {
-        client = interpreterProcess.getClient();
-      } catch (Exception e1) {
-        logger.error("Can't get RemoteInterpreterEvent", e1);
-        waitQuietly();
-        continue;
-      }
-
-      RemoteInterpreterEvent event = null;
-      boolean broken = false;
-      try {
-        event = client.getEvent();
-      } catch (TException e) {
-        broken = true;
-        logger.error("Can't get RemoteInterpreterEvent", e);
-        waitQuietly();
-        continue;
-      } finally {
-        interpreterProcess.releaseClient(client, broken);
-      }
+      RemoteInterpreterEvent event = interpreterProcess.callRemoteFunction(
+          new 
RemoteInterpreterProcess.RemoteFunction<RemoteInterpreterEvent>() {
+            @Override
+            public RemoteInterpreterEvent call(Client client) throws Exception 
{
+              return client.getEvent();
+            }
+          }
+      );
 
       AngularObjectRegistry angularObjectRegistry = 
interpreterGroup.getAngularObjectRegistry();
 
@@ -286,10 +275,7 @@ public class RemoteInterpreterEventPoller extends Thread {
     boolean broken = false;
     final Gson gson = new Gson();
     final String eventOwnerKey = reqResourceBody.getOwnerKey();
-    Client interpreterServerMain = null;
     try {
-      interpreterServerMain = interpreterProcess.getClient();
-      final Client eventClient = interpreterServerMain;
       if (resourceType == RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS) 
{
         final List<ZeppelinServerResourceParagraphRunner> remoteRunners = new 
LinkedList<>();
 
@@ -308,7 +294,6 @@ public class RemoteInterpreterEventPoller extends Thread {
 
               @Override
               public void onFinished(Object resultObject) {
-                boolean clientBroken = false;
                 if (resultObject != null && resultObject instanceof List) {
                   List<InterpreterContextRunner> runnerList =
                       (List<InterpreterContextRunner>) resultObject;
@@ -324,15 +309,15 @@ public class RemoteInterpreterEventPoller extends Thread {
                   
resResource.setResourceType(RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS);
                   resResource.setData(remoteRunners);
 
-                  try {
-                    
eventClient.onReceivedZeppelinResource(resResource.toJson());
-                  } catch (Exception e) {
-                    clientBroken = true;
-                    logger.error("Can't get RemoteInterpreterEvent", e);
-                    waitQuietly();
-                  } finally {
-                    interpreterProcess.releaseClient(eventClient, 
clientBroken);
-                  }
+                  interpreterProcess.callRemoteFunction(
+                      new RemoteInterpreterProcess.RemoteFunction<Void>() {
+                        @Override
+                        public Void call(Client client) throws Exception {
+                          
client.onReceivedZeppelinResource(resResource.toJson());
+                          return null;
+                        }
+                      }
+                  );
                 }
               }
 
@@ -346,39 +331,32 @@ public class RemoteInterpreterEventPoller extends Thread {
             reqRunnerContext.getNoteId(), reqRunnerContext.getParagraphId(), 
callBackEvent);
       }
     } catch (Exception e) {
-      broken = true;
       logger.error("Can't get RemoteInterpreterEvent", e);
       waitQuietly();
 
-    } finally {
-      interpreterProcess.releaseClient(interpreterServerMain, broken);
     }
   }
 
-  private void sendResourcePoolResponseGetAll(ResourceSet resourceSet) {
-    Client client = null;
-    boolean broken = false;
-    try {
-      client = interpreterProcess.getClient();
-      List<String> resourceList = new LinkedList<>();
-      Gson gson = new Gson();
-      for (Resource r : resourceSet) {
-        resourceList.add(gson.toJson(r));
-      }
-      client.resourcePoolResponseGetAll(resourceList);
-    } catch (Exception e) {
-      logger.error(e.getMessage(), e);
-      broken = true;
-    } finally {
-      if (client != null) {
-        interpreterProcess.releaseClient(client, broken);
-      }
-    }
+  private void sendResourcePoolResponseGetAll(final ResourceSet resourceSet) {
+    interpreterProcess.callRemoteFunction(
+        new RemoteInterpreterProcess.RemoteFunction<Void>() {
+          @Override
+          public Void call(Client client) throws Exception {
+            List<String> resourceList = new LinkedList<>();
+            for (Resource r : resourceSet) {
+              resourceList.add(r.toJson());
+            }
+            client.resourcePoolResponseGetAll(resourceList);
+            return null;
+          }
+        }
+    );
   }
 
   private ResourceSet getAllResourcePoolExcept() {
     ResourceSet resourceSet = new ResourceSet();
-    for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
+    for (InterpreterGroup intpGroup : interpreterGroup.getInterpreterSetting()
+        .getInterpreterSettingManager().getAllInterpreterGroup()) {
       if (intpGroup.getId().equals(interpreterGroup.getId())) {
         continue;
       }
@@ -390,115 +368,94 @@ public class RemoteInterpreterEventPoller extends Thread 
{
           resourceSet.addAll(localPool.getAll());
         }
       } else if (interpreterProcess.isRunning()) {
-        Client client = null;
-        boolean broken = false;
-        try {
-          client = remoteInterpreterProcess.getClient();
-          List<String> resourceList = client.resourcePoolGetAll();
-          Gson gson = new Gson();
-          for (String res : resourceList) {
-            resourceSet.add(Resource.fromJson(res));
-          }
-        } catch (Exception e) {
-          logger.error(e.getMessage(), e);
-          broken = true;
-        } finally {
-          if (client != null) {
-            intpGroup.getRemoteInterpreterProcess().releaseClient(client, 
broken);
-          }
+        List<String> resourceList = 
remoteInterpreterProcess.callRemoteFunction(
+            new RemoteInterpreterProcess.RemoteFunction<List<String>>() {
+              @Override
+              public List<String> call(Client client) throws Exception {
+                return client.resourcePoolGetAll();
+              }
+            }
+        );
+        for (String res : resourceList) {
+          resourceSet.add(Resource.fromJson(res));
         }
       }
     }
     return resourceSet;
   }
 
-  private void sendResourceResponseGet(ResourceId resourceId, Object o) {
-    Client client = null;
-    boolean broken = false;
-    try {
-      client = interpreterProcess.getClient();
-      Gson gson = new Gson();
-      String rid = gson.toJson(resourceId);
-      ByteBuffer obj;
-      if (o == null) {
-        obj = ByteBuffer.allocate(0);
-      } else {
-        obj = Resource.serializeObject(o);
-      }
-      client.resourceResponseGet(rid, obj);
-    } catch (Exception e) {
-      logger.error(e.getMessage(), e);
-      broken = true;
-    } finally {
-      if (client != null) {
-        interpreterProcess.releaseClient(client, broken);
-      }
-    }
+  private void sendResourceResponseGet(final ResourceId resourceId, final 
Object o) {
+    interpreterProcess.callRemoteFunction(
+        new RemoteInterpreterProcess.RemoteFunction<Void>() {
+          @Override
+          public Void call(Client client) throws Exception {
+            String rid = resourceId.toJson();
+            ByteBuffer obj;
+            if (o == null) {
+              obj = ByteBuffer.allocate(0);
+            } else {
+              obj = Resource.serializeObject(o);
+            }
+            client.resourceResponseGet(rid, obj);
+            return null;
+          }
+        }
+    );
   }
 
-  private Object getResource(ResourceId resourceId) {
-    InterpreterGroup intpGroup = InterpreterGroup.getByInterpreterGroupId(
-        resourceId.getResourcePoolId());
+  private Object getResource(final ResourceId resourceId) {
+    InterpreterGroup intpGroup = interpreterGroup.getInterpreterSetting()
+        .getInterpreterSettingManager()
+        .getInterpreterGroupById(resourceId.getResourcePoolId());
     if (intpGroup == null) {
       return null;
     }
     RemoteInterpreterProcess remoteInterpreterProcess = 
intpGroup.getRemoteInterpreterProcess();
-    if (remoteInterpreterProcess == null) {
-      ResourcePool localPool = intpGroup.getResourcePool();
-      if (localPool != null) {
-        return localPool.get(resourceId.getName());
-      }
-    } else if (interpreterProcess.isRunning()) {
-      Client client = null;
-      boolean broken = false;
-      try {
-        client = remoteInterpreterProcess.getClient();
-        ByteBuffer res = client.resourceGet(
-            resourceId.getNoteId(),
-            resourceId.getParagraphId(),
-            resourceId.getName());
-        Object o = Resource.deserializeObject(res);
-        return o;
-      } catch (Exception e) {
-        logger.error(e.getMessage(), e);
-        broken = true;
-      } finally {
-        if (client != null) {
-          intpGroup.getRemoteInterpreterProcess().releaseClient(client, 
broken);
+    ByteBuffer buffer = remoteInterpreterProcess.callRemoteFunction(
+        new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() {
+          @Override
+          public ByteBuffer call(Client client) throws Exception {
+            return  client.resourceGet(
+                resourceId.getNoteId(),
+                resourceId.getParagraphId(),
+                resourceId.getName());
+          }
         }
-      }
-    }
-    return null;
-  }
+    );
 
-  public void sendInvokeMethodResult(InvokeResourceMethodEventMessage message, 
Object o) {
-    Client client = null;
-    boolean broken = false;
     try {
-      client = interpreterProcess.getClient();
-      Gson gson = new Gson();
-      String invokeMessage = gson.toJson(message);
-      ByteBuffer obj;
-      if (o == null) {
-        obj = ByteBuffer.allocate(0);
-      } else {
-        obj = Resource.serializeObject(o);
-      }
-      client.resourceResponseInvokeMethod(invokeMessage, obj);
+      Object o = Resource.deserializeObject(buffer);
+      return o;
     } catch (Exception e) {
       logger.error(e.getMessage(), e);
-      broken = true;
-    } finally {
-      if (client != null) {
-        interpreterProcess.releaseClient(client, broken);
-      }
     }
+    return null;
   }
 
-  private Object invokeResourceMethod(InvokeResourceMethodEventMessage 
message) {
-    ResourceId resourceId = message.resourceId;
-    InterpreterGroup intpGroup = InterpreterGroup.getByInterpreterGroupId(
-        resourceId.getResourcePoolId());
+  public void sendInvokeMethodResult(final InvokeResourceMethodEventMessage 
message,
+                                     final Object o) {
+    interpreterProcess.callRemoteFunction(
+        new RemoteInterpreterProcess.RemoteFunction<Void>() {
+          @Override
+          public Void call(Client client) throws Exception {
+            String invokeMessage = message.toJson();
+            ByteBuffer obj;
+            if (o == null) {
+              obj = ByteBuffer.allocate(0);
+            } else {
+              obj = Resource.serializeObject(o);
+            }
+            client.resourceResponseInvokeMethod(invokeMessage, obj);
+            return null;
+          }
+        }
+    );
+  }
+
+  private Object invokeResourceMethod(final InvokeResourceMethodEventMessage 
message) {
+    final ResourceId resourceId = message.resourceId;
+    InterpreterGroup intpGroup = interpreterGroup.getInterpreterSetting()
+        
.getInterpreterSettingManager().getInterpreterGroupById(resourceId.getResourcePoolId());
     if (intpGroup == null) {
       return null;
     }
@@ -529,25 +486,25 @@ public class RemoteInterpreterEventPoller extends Thread {
         return null;
       }
     } else if (interpreterProcess.isRunning()) {
-      Client client = null;
-      boolean broken = false;
+      ByteBuffer res = interpreterProcess.callRemoteFunction(
+          new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() {
+            @Override
+            public ByteBuffer call(Client client) throws Exception {
+              return client.resourceInvokeMethod(
+                  resourceId.getNoteId(),
+                  resourceId.getParagraphId(),
+                  resourceId.getName(),
+                  message.toJson());
+            }
+          }
+      );
+
       try {
-        client = remoteInterpreterProcess.getClient();
-        ByteBuffer res = client.resourceInvokeMethod(
-            resourceId.getNoteId(),
-            resourceId.getParagraphId(),
-            resourceId.getName(),
-            gson.toJson(message));
-        Object o = Resource.deserializeObject(res);
-        return o;
+        return Resource.deserializeObject(res);
       } catch (Exception e) {
         logger.error(e.getMessage(), e);
-        broken = true;
-      } finally {
-        if (client != null) {
-          intpGroup.getRemoteInterpreterProcess().releaseClient(client, 
broken);
-        }
       }
+      return null;
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
new file mode 100644
index 0000000..19356fb
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.commons.exec.*;
+import org.apache.commons.exec.environment.EnvironmentUtils;
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+/**
+ * This class manages start / stop of remote interpreter process
+ */
+public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
+    implements ExecuteResultHandler {
+  private static final Logger logger = LoggerFactory.getLogger(
+      RemoteInterpreterManagedProcess.class);
+  private final String interpreterRunner;
+
+  private DefaultExecutor executor;
+  private ExecuteWatchdog watchdog;
+  boolean running = false;
+  private int port = -1;
+  private final String interpreterDir;
+  private final String localRepoDir;
+  private final String interpreterGroupName;
+
+  private Map<String, String> env;
+
+  public RemoteInterpreterManagedProcess(
+      String intpRunner,
+      String intpDir,
+      String localRepoDir,
+      Map<String, String> env,
+      int connectTimeout,
+      RemoteInterpreterProcessListener listener,
+      ApplicationEventListener appListener,
+      String interpreterGroupName) {
+    super(new RemoteInterpreterEventPoller(listener, appListener),
+        connectTimeout);
+    this.interpreterRunner = intpRunner;
+    this.env = env;
+    this.interpreterDir = intpDir;
+    this.localRepoDir = localRepoDir;
+    this.interpreterGroupName = interpreterGroupName;
+  }
+
+  RemoteInterpreterManagedProcess(String intpRunner,
+                                  String intpDir,
+                                  String localRepoDir,
+                                  Map<String, String> env,
+                                  RemoteInterpreterEventPoller 
remoteInterpreterEventPoller,
+                                  int connectTimeout,
+                                  String interpreterGroupName) {
+    super(remoteInterpreterEventPoller,
+        connectTimeout);
+    this.interpreterRunner = intpRunner;
+    this.env = env;
+    this.interpreterDir = intpDir;
+    this.localRepoDir = localRepoDir;
+    this.interpreterGroupName = interpreterGroupName;
+  }
+
+  @Override
+  public String getHost() {
+    return "localhost";
+  }
+
+  @Override
+  public int getPort() {
+    return port;
+  }
+
+  @Override
+  public void start(String userName, Boolean isUserImpersonate) {
+    // start server process
+    try {
+      port = 
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
+      logger.info("Choose port {} for RemoteInterpreterProcess", port);
+    } catch (IOException e1) {
+      throw new InterpreterException(e1);
+    }
+
+    CommandLine cmdLine = CommandLine.parse(interpreterRunner);
+    cmdLine.addArgument("-d", false);
+    cmdLine.addArgument(interpreterDir, false);
+    cmdLine.addArgument("-p", false);
+    cmdLine.addArgument(Integer.toString(port), false);
+    if (isUserImpersonate && !userName.equals("anonymous")) {
+      cmdLine.addArgument("-u", false);
+      cmdLine.addArgument(userName, false);
+    }
+    cmdLine.addArgument("-l", false);
+    cmdLine.addArgument(localRepoDir, false);
+    cmdLine.addArgument("-g", false);
+    cmdLine.addArgument(interpreterGroupName, false);
+
+    executor = new DefaultExecutor();
+
+    ByteArrayOutputStream cmdOut = new ByteArrayOutputStream();
+    ProcessLogOutputStream processOutput = new ProcessLogOutputStream(logger);
+    processOutput.setOutputStream(cmdOut);
+
+    executor.setStreamHandler(new PumpStreamHandler(processOutput));
+    watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
+    executor.setWatchdog(watchdog);
+
+    try {
+      Map procEnv = EnvironmentUtils.getProcEnvironment();
+      procEnv.putAll(env);
+
+      logger.info("Run interpreter process {}", cmdLine);
+      executor.execute(cmdLine, procEnv, this);
+      running = true;
+    } catch (IOException e) {
+      running = false;
+      throw new InterpreterException(e);
+    }
+
+
+    long startTime = System.currentTimeMillis();
+    while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
+      if (!running) {
+        try {
+          cmdOut.flush();
+        } catch (IOException e) {
+          // nothing to do
+        }
+        throw new InterpreterException(new String(cmdOut.toByteArray()));
+      }
+
+      try {
+        if 
(RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) {
+          break;
+        } else {
+          try {
+            Thread.sleep(500);
+          } catch (InterruptedException e) {
+            logger.error("Exception in RemoteInterpreterProcess while 
synchronized reference " +
+                    "Thread.sleep", e);
+          }
+        }
+      } catch (Exception e) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Remote interpreter not yet accessible at localhost:" + 
port);
+        }
+      }
+    }
+    processOutput.setOutputStream(null);
+  }
+
+  public void stop() {
+    if (isRunning()) {
+      logger.info("kill interpreter process");
+      try {
+        callRemoteFunction(new RemoteFunction<Void>() {
+          @Override
+          public Void call(RemoteInterpreterService.Client client) throws 
Exception {
+            client.shutdown();
+            return null;
+          }
+        });
+      } catch (Exception e) {
+        logger.warn("ignore the exception when shutting down");
+      }
+      watchdog.destroyProcess();
+    }
+
+    executor = null;
+    watchdog = null;
+    running = false;
+    logger.info("Remote process terminated");
+  }
+
+  @Override
+  public void onProcessComplete(int exitValue) {
+    logger.info("Interpreter process exited {}", exitValue);
+    running = false;
+
+  }
+
+  @Override
+  public void onProcessFailed(ExecuteException e) {
+    logger.info("Interpreter process failed {}", e);
+    running = false;
+  }
+
+  public boolean isRunning() {
+    return running;
+  }
+
+  private static class ProcessLogOutputStream extends LogOutputStream {
+
+    private Logger logger;
+    OutputStream out;
+
+    public ProcessLogOutputStream(Logger logger) {
+      this.logger = logger;
+    }
+
+    @Override
+    protected void processLine(String s, int i) {
+      this.logger.debug(s);
+    }
+
+    @Override
+    public void write(byte [] b) throws IOException {
+      super.write(b);
+
+      if (out != null) {
+        synchronized (this) {
+          if (out != null) {
+            out.write(b);
+          }
+        }
+      }
+    }
+
+    @Override
+    public void write(byte [] b, int offset, int len) throws IOException {
+      super.write(b, offset, len);
+
+      if (out != null) {
+        synchronized (this) {
+          if (out != null) {
+            out.write(b, offset, len);
+          }
+        }
+      }
+    }
+
+    public void setOutputStream(OutputStream out) {
+      synchronized (this) {
+        this.out = out;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index 1d48a1e..a78088c 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -20,10 +20,13 @@ import com.google.gson.Gson;
 import org.apache.commons.pool2.impl.GenericObjectPool;
 import org.apache.thrift.TException;
 import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -32,9 +35,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 public abstract class RemoteInterpreterProcess {
   private static final Logger logger = 
LoggerFactory.getLogger(RemoteInterpreterProcess.class);
 
-  // number of sessions that are attached to this process
-  private final AtomicInteger referenceCount;
-
   private GenericObjectPool<Client> clientPool;
   private final RemoteInterpreterEventPoller remoteInterpreterEventPoller;
   private final InterpreterContextRunnerPool interpreterContextRunnerPool;
@@ -46,16 +46,20 @@ public abstract class RemoteInterpreterProcess {
       ApplicationEventListener appListener) {
     this(new RemoteInterpreterEventPoller(listener, appListener),
         connectTimeout);
+    this.remoteInterpreterEventPoller.setInterpreterProcess(this);
   }
 
   RemoteInterpreterProcess(RemoteInterpreterEventPoller 
remoteInterpreterEventPoller,
                            int connectTimeout) {
     this.interpreterContextRunnerPool = new InterpreterContextRunnerPool();
-    referenceCount = new AtomicInteger(0);
     this.remoteInterpreterEventPoller = remoteInterpreterEventPoller;
     this.connectTimeout = connectTimeout;
   }
 
+  public RemoteInterpreterEventPoller getRemoteInterpreterEventPoller() {
+    return remoteInterpreterEventPoller;
+  }
+
   public abstract String getHost();
   public abstract int getPort();
   public abstract void start(String userName, Boolean isUserImpersonate);
@@ -66,37 +70,18 @@ public abstract class RemoteInterpreterProcess {
     return connectTimeout;
   }
 
-  public int reference(InterpreterGroup interpreterGroup, String userName,
-                       Boolean isUserImpersonate) {
-    synchronized (referenceCount) {
-      if (!isRunning()) {
-        start(userName, isUserImpersonate);
-      }
-
-      if (clientPool == null) {
-        clientPool = new GenericObjectPool<>(new ClientFactory(getHost(), 
getPort()));
-        clientPool.setTestOnBorrow(true);
-
-        remoteInterpreterEventPoller.setInterpreterGroup(interpreterGroup);
-        remoteInterpreterEventPoller.setInterpreterProcess(this);
-        remoteInterpreterEventPoller.start();
-      }
-      return referenceCount.incrementAndGet();
-    }
-  }
-
-  public Client getClient() throws Exception {
+  public synchronized Client getClient() throws Exception {
     if (clientPool == null || clientPool.isClosed()) {
-      return null;
+      clientPool = new GenericObjectPool<>(new ClientFactory(getHost(), 
getPort()));
     }
     return clientPool.borrowObject();
   }
 
-  public void releaseClient(Client client) {
+  private void releaseClient(Client client) {
     releaseClient(client, false);
   }
 
-  public void releaseClient(Client client, boolean broken) {
+  private void releaseClient(Client client, boolean broken) {
     if (broken) {
       releaseBrokenClient(client);
     } else {
@@ -108,7 +93,7 @@ public abstract class RemoteInterpreterProcess {
     }
   }
 
-  public void releaseBrokenClient(Client client) {
+  private void releaseBrokenClient(Client client) {
     try {
       clientPool.invalidateObject(client);
     } catch (Exception e) {
@@ -116,90 +101,6 @@ public abstract class RemoteInterpreterProcess {
     }
   }
 
-  public int dereference() {
-    synchronized (referenceCount) {
-      int r = referenceCount.decrementAndGet();
-      if (r == 0) {
-        logger.info("shutdown interpreter process");
-        remoteInterpreterEventPoller.shutdown();
-
-        // first try shutdown
-        Client client = null;
-        try {
-          client = getClient();
-          client.shutdown();
-        } catch (Exception e) {
-          // safely ignore exception while client.shutdown() may terminates 
remote process
-          logger.info("Exception in RemoteInterpreterProcess while 
synchronized dereference, can " +
-              "safely ignore exception while client.shutdown() may terminates 
remote process");
-          logger.debug(e.getMessage(), e);
-        } finally {
-          if (client != null) {
-            // no longer used
-            releaseBrokenClient(client);
-          }
-        }
-
-        clientPool.clear();
-        clientPool.close();
-
-        // wait for some time (connectTimeout) and force kill
-        // remote process server.serve() loop is not always finishing 
gracefully
-        long startTime = System.currentTimeMillis();
-        while (System.currentTimeMillis() - startTime < connectTimeout) {
-          if (this.isRunning()) {
-            try {
-              Thread.sleep(500);
-            } catch (InterruptedException e) {
-              logger.error("Exception in RemoteInterpreterProcess while 
synchronized dereference " +
-                  "Thread.sleep", e);
-            }
-          } else {
-            break;
-          }
-        }
-      }
-      return r;
-    }
-  }
-
-  public int referenceCount() {
-    synchronized (referenceCount) {
-      return referenceCount.get();
-    }
-  }
-
-  public int getNumActiveClient() {
-    if (clientPool == null) {
-      return 0;
-    } else {
-      return clientPool.getNumActive();
-    }
-  }
-
-  public int getNumIdleClient() {
-    if (clientPool == null) {
-      return 0;
-    } else {
-      return clientPool.getNumIdle();
-    }
-  }
-
-  public void setMaxPoolSize(int size) {
-    if (clientPool != null) {
-      //Size + 2 for progress poller , cancel operation
-      clientPool.setMaxTotal(size + 2);
-    }
-  }
-
-  public int getMaxPoolSize() {
-    if (clientPool != null) {
-      return clientPool.getMaxTotal();
-    } else {
-      return 0;
-    }
-  }
-
   /**
    * Called when angular object is updated in client side to propagate
    * change to the remote process
@@ -239,4 +140,33 @@ public abstract class RemoteInterpreterProcess {
   public InterpreterContextRunnerPool getInterpreterContextRunnerPool() {
     return interpreterContextRunnerPool;
   }
+
+  public <T> T callRemoteFunction(RemoteFunction<T> func) {
+    Client client = null;
+    boolean broken = false;
+    try {
+      client = getClient();
+      if (client != null) {
+        return func.call(client);
+      }
+    } catch (TException e) {
+      broken = true;
+      throw new InterpreterException(e);
+    } catch (Exception e1) {
+      throw new InterpreterException(e1);
+    } finally {
+      if (client != null) {
+        releaseClient(client, broken);
+      }
+    }
+    return null;
+  }
+
+  /**
+   *
+   * @param <T>
+   */
+  public interface RemoteFunction<T> {
+    T call(Client client) throws Exception;
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
new file mode 100644
index 0000000..bb176be
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class connects to existing process
+ */
+public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
+  private final Logger logger = 
LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class);
+  private final String host;
+  private final int port;
+
+  public RemoteInterpreterRunningProcess(
+      int connectTimeout,
+      RemoteInterpreterProcessListener listener,
+      ApplicationEventListener appListener,
+      String host,
+      int port
+  ) {
+    super(connectTimeout, listener, appListener);
+    this.host = host;
+    this.port = port;
+  }
+
+  @Override
+  public String getHost() {
+    return host;
+  }
+
+  @Override
+  public int getPort() {
+    return port;
+  }
+
+  @Override
+  public void start(String userName, Boolean isUserImpersonate) {
+    // assume process is externally managed. nothing to do
+  }
+
+  @Override
+  public void stop() {
+    // assume process is externally managed. nothing to do
+  }
+
+  @Override
+  public boolean isRunning() {
+    return RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), 
getPort());
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 3853468..3d8123e 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -106,6 +106,7 @@ public class RemoteInterpreterServer
 
   @Override
   public void shutdown() throws TException {
+    logger.info("Shutting down...");
     eventClient.waitForEventQueueBecomesEmpty(DEFAULT_SHUTDOWN_TIMEOUT);
     if (interpreterGroup != null) {
       interpreterGroup.close();
@@ -159,7 +160,7 @@ public class RemoteInterpreterServer
   }
 
   @Override
-  public void createInterpreter(String interpreterGroupId, String sessionKey, 
String
+  public void createInterpreter(String interpreterGroupId, String sessionId, 
String
       className, Map<String, String> properties, String userName) throws 
TException {
     if (interpreterGroup == null) {
       interpreterGroup = new InterpreterGroup(interpreterGroupId);
@@ -190,20 +191,11 @@ public class RemoteInterpreterServer
           replClass.getConstructor(new Class[] {Properties.class});
       Interpreter repl = constructor.newInstance(p);
       repl.setClassloaderUrls(new URL[]{});
-
-      synchronized (interpreterGroup) {
-        List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
-        if (interpreters == null) {
-          interpreters = new LinkedList<>();
-          interpreterGroup.put(sessionKey, interpreters);
-        }
-
-        interpreters.add(new LazyOpenInterpreter(repl));
-      }
-
       logger.info("Instantiate interpreter {}", className);
       repl.setInterpreterGroup(interpreterGroup);
       repl.setUserName(userName);
+
+      interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(repl), 
sessionId);
     } catch (ClassNotFoundException | NoSuchMethodException | SecurityException
         | InstantiationException | IllegalAccessException
         | IllegalArgumentException | InvocationTargetException e) {
@@ -237,13 +229,13 @@ public class RemoteInterpreterServer
     }
   }
 
-  protected Interpreter getInterpreter(String sessionKey, String className) 
throws TException {
+  protected Interpreter getInterpreter(String sessionId, String className) 
throws TException {
     if (interpreterGroup == null) {
       throw new TException(
           new InterpreterException("Interpreter instance " + className + " not 
created"));
     }
     synchronized (interpreterGroup) {
-      List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
+      List<Interpreter> interpreters = interpreterGroup.get(sessionId);
       if (interpreters == null) {
         throw new TException(
             new InterpreterException("Interpreter " + className + " not 
initialized"));
@@ -259,19 +251,20 @@ public class RemoteInterpreterServer
   }
 
   @Override
-  public void open(String noteId, String className) throws TException {
-    Interpreter intp = getInterpreter(noteId, className);
+  public void open(String sessionId, String className) throws TException {
+    logger.info(String.format("Open Interpreter %s for session %s ", 
className, sessionId));
+    Interpreter intp = getInterpreter(sessionId, className);
     intp.open();
   }
 
   @Override
-  public void close(String sessionKey, String className) throws TException {
+  public void close(String sessionId, String className) throws TException {
     // unload all applications
     for (String appId : runningApplications.keySet()) {
       RunningApplication appInfo = runningApplications.get(appId);
 
       // see NoteInterpreterLoader.SHARED_SESSION
-      if (appInfo.noteId.equals(sessionKey) || 
sessionKey.equals("shared_session")) {
+      if (appInfo.noteId.equals(sessionId) || 
sessionId.equals("shared_session")) {
         try {
           logger.info("Unload App {} ", appInfo.pkg.getName());
           appInfo.app.unload();
@@ -286,7 +279,7 @@ public class RemoteInterpreterServer
     // close interpreters
     List<Interpreter> interpreters;
     synchronized (interpreterGroup) {
-      interpreters = interpreterGroup.get(sessionKey);
+      interpreters = interpreterGroup.get(sessionId);
     }
     if (interpreters != null) {
       Iterator<Interpreter> it = interpreters.iterator();
@@ -322,7 +315,6 @@ public class RemoteInterpreterServer
         intp,
         st,
         context);
-
     scheduler.submit(job);
 
     while (!job.isTerminated()) {
@@ -566,30 +558,34 @@ public class RemoteInterpreterServer
   }
 
   @Override
-  public int getProgress(String noteId, String className,
+  public int getProgress(String sessionId, String className,
                          RemoteInterpreterContext interpreterContext)
       throws TException {
     Integer manuallyProvidedProgress = 
progressMap.get(interpreterContext.getParagraphId());
     if (manuallyProvidedProgress != null) {
       return manuallyProvidedProgress;
     } else {
-      Interpreter intp = getInterpreter(noteId, className);
+      Interpreter intp = getInterpreter(sessionId, className);
+      if (intp == null) {
+        throw new TException("No interpreter {} existed for session {}".format(
+            className, sessionId));
+      }
       return intp.getProgress(convert(interpreterContext, null));
     }
   }
 
 
   @Override
-  public String getFormType(String noteId, String className) throws TException 
{
-    Interpreter intp = getInterpreter(noteId, className);
+  public String getFormType(String sessionId, String className) throws 
TException {
+    Interpreter intp = getInterpreter(sessionId, className);
     return intp.getFormType().toString();
   }
 
   @Override
-  public List<InterpreterCompletion> completion(String noteId,
+  public List<InterpreterCompletion> completion(String sessionId,
       String className, String buf, int cursor, RemoteInterpreterContext 
remoteInterpreterContext)
       throws TException {
-    Interpreter intp = getInterpreter(noteId, className);
+    Interpreter intp = getInterpreter(sessionId, className);
     List completion = intp.completion(buf, cursor, 
convert(remoteInterpreterContext, null));
     return completion;
   }
@@ -766,16 +762,16 @@ public class RemoteInterpreterServer
   }
 
   @Override
-  public String getStatus(String sessionKey, String jobId)
+  public String getStatus(String sessionId, String jobId)
       throws TException {
     if (interpreterGroup == null) {
-      return "Unknown";
+      return Status.UNKNOWN.name();
     }
 
     synchronized (interpreterGroup) {
-      List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
+      List<Interpreter> interpreters = interpreterGroup.get(sessionId);
       if (interpreters == null) {
-        return "Unknown";
+        return Status.UNKNOWN.name();
       }
 
       for (Interpreter intp : interpreters) {
@@ -792,7 +788,7 @@ public class RemoteInterpreterServer
         }
       }
     }
-    return "Unknown";
+    return Status.UNKNOWN.name();
   }
 
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java
deleted file mode 100644
index b26995a..0000000
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.resource;
-
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
-import org.slf4j.Logger;
-
-import java.util.List;
-
-/**
- * Utilities for ResourcePool
- */
-public class ResourcePoolUtils {
-  static Logger logger = 
org.slf4j.LoggerFactory.getLogger(ResourcePoolUtils.class);
-
-  public static ResourceSet getAllResources() {
-    return getAllResourcesExcept(null);
-  }
-
-  public static ResourceSet getAllResourcesExcept(String 
interpreterGroupExcludsion) {
-    ResourceSet resourceSet = new ResourceSet();
-    for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
-      if (interpreterGroupExcludsion != null &&
-          intpGroup.getId().equals(interpreterGroupExcludsion)) {
-        continue;
-      }
-
-      RemoteInterpreterProcess remoteInterpreterProcess = 
intpGroup.getRemoteInterpreterProcess();
-      if (remoteInterpreterProcess == null) {
-        ResourcePool localPool = intpGroup.getResourcePool();
-        if (localPool != null) {
-          resourceSet.addAll(localPool.getAll());
-        }
-      } else if (remoteInterpreterProcess.isRunning()) {
-        RemoteInterpreterService.Client client = null;
-        boolean broken = false;
-        try {
-          client = remoteInterpreterProcess.getClient();
-          if (client == null) {
-            // remote interpreter may not started yet or terminated.
-            continue;
-          }
-          List<String> resourceList = client.resourcePoolGetAll();
-          for (String res : resourceList) {
-            resourceSet.add(Resource.fromJson(res));
-          }
-        } catch (Exception e) {
-          logger.error(e.getMessage(), e);
-          broken = true;
-        } finally {
-          if (client != null) {
-            intpGroup.getRemoteInterpreterProcess().releaseClient(client, 
broken);
-          }
-        }
-      }
-    }
-    return resourceSet;
-  }
-
-  public static void removeResourcesBelongsToNote(String noteId) {
-    removeResourcesBelongsToParagraph(noteId, null);
-  }
-
-  public static void removeResourcesBelongsToParagraph(String noteId, String 
paragraphId) {
-    for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
-      ResourceSet resourceSet = new ResourceSet();
-      RemoteInterpreterProcess remoteInterpreterProcess = 
intpGroup.getRemoteInterpreterProcess();
-      if (remoteInterpreterProcess == null) {
-        ResourcePool localPool = intpGroup.getResourcePool();
-        if (localPool != null) {
-          resourceSet.addAll(localPool.getAll());
-        }
-        if (noteId != null) {
-          resourceSet = resourceSet.filterByNoteId(noteId);
-        }
-        if (paragraphId != null) {
-          resourceSet = resourceSet.filterByParagraphId(paragraphId);
-        }
-
-        for (Resource r : resourceSet) {
-          localPool.remove(
-              r.getResourceId().getNoteId(),
-              r.getResourceId().getParagraphId(),
-              r.getResourceId().getName());
-        }
-      } else if (remoteInterpreterProcess.isRunning()) {
-        RemoteInterpreterService.Client client = null;
-        boolean broken = false;
-        try {
-          client = remoteInterpreterProcess.getClient();
-          List<String> resourceList = client.resourcePoolGetAll();
-          for (String res : resourceList) {
-            resourceSet.add(Resource.fromJson(res));
-          }
-
-          if (noteId != null) {
-            resourceSet = resourceSet.filterByNoteId(noteId);
-          }
-          if (paragraphId != null) {
-            resourceSet = resourceSet.filterByParagraphId(paragraphId);
-          }
-
-          for (Resource r : resourceSet) {
-            client.resourceRemove(
-                r.getResourceId().getNoteId(),
-                r.getResourceId().getParagraphId(),
-                r.getResourceId().getName());
-          }
-        } catch (Exception e) {
-          logger.error(e.getMessage(), e);
-          broken = true;
-        } finally {
-          if (client != null) {
-            intpGroup.getRemoteInterpreterProcess().releaseClient(client, 
broken);
-          }
-        }
-      }
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
index d0025d8..191902a 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
@@ -41,6 +41,7 @@ public abstract class Job {
   /**
    * Job status.
    *
+   * UNKNOWN - Job is not found in remote
    * READY - Job is not running, ready to run.
    * PENDING - Job is submitted to scheduler. but not running yet
    * RUNNING - Job is running.
@@ -48,8 +49,8 @@ public abstract class Job {
    * ERROR - Job finished run. with error
    * ABORT - Job finished by abort
    */
-  public static enum Status {
-    READY, PENDING, RUNNING, FINISHED, ERROR, ABORT;
+  public enum Status {
+    UNKNOWN, READY, PENDING, RUNNING, FINISHED, ERROR, ABORT;
 
     public boolean isReady() {
       return this == READY;
@@ -70,14 +71,14 @@ public abstract class Job {
   Date dateCreated;
   Date dateStarted;
   Date dateFinished;
-  Status status;
+  volatile Status status;
 
   static Logger LOGGER = LoggerFactory.getLogger(Job.class);
 
   transient boolean aborted = false;
 
-  private String errorMessage;
-  private transient Throwable exception;
+  private volatile String errorMessage;
+  private transient volatile Throwable exception;
   private transient JobListener listener;
   private long progressUpdateIntervalMs;
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
index f9ddc4e..e41540b 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
@@ -17,11 +17,9 @@
 
 package org.apache.zeppelin.scheduler;
 
-import org.apache.thrift.TException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,6 +32,7 @@ import java.util.concurrent.ExecutorService;
 
 /**
  * RemoteScheduler runs in ZeppelinServer and proxies Scheduler running on 
RemoteInterpreter
+ *
  */
 public class RemoteScheduler implements Scheduler {
   Logger logger = LoggerFactory.getLogger(RemoteScheduler.class);
@@ -45,17 +44,17 @@ public class RemoteScheduler implements Scheduler {
   boolean terminate = false;
   private String name;
   private int maxConcurrency;
-  private final String noteId;
-  private RemoteInterpreterProcess interpreterProcess;
+  private final String sessionId;
+  private RemoteInterpreter remoteInterpreter;
 
-  public RemoteScheduler(String name, ExecutorService executor, String noteId,
-                         RemoteInterpreterProcess interpreterProcess, 
SchedulerListener listener,
+  public RemoteScheduler(String name, ExecutorService executor, String 
sessionId,
+                         RemoteInterpreter remoteInterpreter, 
SchedulerListener listener,
                          int maxConcurrency) {
     this.name = name;
     this.executor = executor;
     this.listener = listener;
-    this.noteId = noteId;
-    this.interpreterProcess = interpreterProcess;
+    this.sessionId = sessionId;
+    this.remoteInterpreter = remoteInterpreter;
     this.maxConcurrency = maxConcurrency;
   }
 
@@ -167,14 +166,15 @@ public class RemoteScheduler implements Scheduler {
     private long initialPeriodMsec;
     private long initialPeriodCheckIntervalMsec;
     private long checkIntervalMsec;
-    private boolean terminate;
+    private volatile boolean terminate;
     private JobListener listener;
     private Job job;
-    Status lastStatus;
+    volatile Status lastStatus;
 
     public JobStatusPoller(long initialPeriodMsec,
         long initialPeriodCheckIntervalMsec, long checkIntervalMsec, Job job,
         JobListener listener) {
+      setName("JobStatusPoller-" + job.getId());
       this.initialPeriodMsec = initialPeriodMsec;
       this.initialPeriodCheckIntervalMsec = initialPeriodCheckIntervalMsec;
       this.checkIntervalMsec = checkIntervalMsec;
@@ -209,7 +209,7 @@ public class RemoteScheduler implements Scheduler {
         }
 
         Status newStatus = getStatus();
-        if (newStatus == null) { // unknown
+        if (newStatus == Status.UNKNOWN) { // unknown
           continue;
         }
 
@@ -231,7 +231,9 @@ public class RemoteScheduler implements Scheduler {
 
     private Status getLastStatus() {
       if (terminate == true) {
-        if (lastStatus != Status.FINISHED &&
+        if (job.getErrorMessage() != null) {
+          return Status.ERROR;
+        } else if (lastStatus != Status.FINISHED &&
             lastStatus != Status.ERROR &&
             lastStatus != Status.ABORT) {
           return Status.FINISHED;
@@ -239,58 +241,35 @@ public class RemoteScheduler implements Scheduler {
           return (lastStatus == null) ? Status.FINISHED : lastStatus;
         }
       } else {
-        return (lastStatus == null) ? Status.FINISHED : lastStatus;
+        return (lastStatus == null) ? Status.UNKNOWN : lastStatus;
       }
     }
 
     public synchronized Job.Status getStatus() {
-      if (interpreterProcess.referenceCount() <= 0) {
+      if (!remoteInterpreter.isOpened()) {
         return getLastStatus();
       }
-
-      Client client;
-      try {
-        client = interpreterProcess.getClient();
-      } catch (Exception e) {
-        logger.error("Can't get status information", e);
-        lastStatus = Status.ERROR;
-        return Status.ERROR;
-      }
-
-      boolean broken = false;
-      try {
-        String statusStr = client.getStatus(noteId, job.getId());
-        if ("Unknown".equals(statusStr)) {
-          // not found this job in the remote schedulers.
-          // maybe not submitted, maybe already finished
-          //Status status = getLastStatus();
-          listener.afterStatusChange(job, null, null);
-          return job.getStatus();
-        }
-        Status status = Status.valueOf(statusStr);
-        lastStatus = status;
-        listener.afterStatusChange(job, null, status);
-        return status;
-      } catch (TException e) {
-        broken = true;
-        logger.error("Can't get status information", e);
-        lastStatus = Status.ERROR;
-        return Status.ERROR;
-      } catch (Exception e) {
-        logger.error("Unknown status", e);
-        lastStatus = Status.ERROR;
-        return Status.ERROR;
-      } finally {
-        interpreterProcess.releaseClient(client, broken);
+      Status status = Status.valueOf(remoteInterpreter.getStatus(job.getId()));
+      if (status == Status.UNKNOWN) {
+        // not found this job in the remote schedulers.
+        // maybe not submitted, maybe already finished
+        //Status status = getLastStatus();
+        listener.afterStatusChange(job, null, null);
+        return job.getStatus();
       }
+      lastStatus = status;
+      listener.afterStatusChange(job, null, status);
+      return status;
     }
   }
 
+  //TODO(zjffdu) need to refactor the schdule module which is too complicated
   private class JobRunner implements Runnable, JobListener {
+    private final Logger logger = LoggerFactory.getLogger(JobRunner.class);
     private Scheduler scheduler;
     private Job job;
-    private boolean jobExecuted;
-    boolean jobSubmittedRemotely;
+    private volatile boolean jobExecuted;
+    volatile boolean jobSubmittedRemotely;
 
     public JobRunner(Scheduler scheduler, Job job) {
       this.scheduler = scheduler;
@@ -338,20 +317,22 @@ public class RemoteScheduler implements Scheduler {
       }
 
       // set job status based on result.
-      Status lastStatus = jobStatusPoller.getStatus();
       Object jobResult = job.getReturn();
-      if (jobResult != null && jobResult instanceof InterpreterResult) {
-        if (((InterpreterResult) jobResult).code() == Code.ERROR) {
-          lastStatus = Status.ERROR;
-        }
-      }
-      if (job.getException() != null) {
-        lastStatus = Status.ERROR;
+      if (job.isAborted()) {
+        job.setStatus(Status.ABORT);
+      } else if (job.getException() != null) {
+//        logger.info("Job ABORT, " + job.getId());
+        job.setStatus(Status.ERROR);
+      } else if (jobResult != null && jobResult instanceof InterpreterResult
+          && ((InterpreterResult) jobResult).code() == Code.ERROR) {
+//        logger.info("Job Error, " + job.getId());
+        job.setStatus(Status.ERROR);
+      } else {
+//        logger.info("Job Finished, " + job.getId());
+        job.setStatus(Status.FINISHED);
       }
 
       synchronized (queue) {
-        job.setStatus(lastStatus);
-
         if (listener != null) {
           listener.jobFinished(scheduler, job);
         }
@@ -374,25 +355,6 @@ public class RemoteScheduler implements Scheduler {
 
     @Override
     public void afterStatusChange(Job job, Status before, Status after) {
-      if (after == null) { // unknown. maybe before sumitted remotely, maybe 
already finished.
-        if (jobExecuted) {
-          jobSubmittedRemotely = true;
-          Object jobResult = job.getReturn();
-          if (job.isAborted()) {
-            job.setStatus(Status.ABORT);
-          } else if (job.getException() != null) {
-            job.setStatus(Status.ERROR);
-          } else if (jobResult != null && jobResult instanceof 
InterpreterResult
-              && ((InterpreterResult) jobResult).code() == Code.ERROR) {
-            job.setStatus(Status.ERROR);
-          } else {
-            job.setStatus(Status.FINISHED);
-          }
-        }
-        return;
-      }
-
-
       // Update remoteStatus
       if (jobExecuted == false) {
         if (after == Status.FINISHED || after == Status.ABORT
@@ -402,14 +364,18 @@ public class RemoteScheduler implements Scheduler {
           return;
         } else if (after == Status.RUNNING) {
           jobSubmittedRemotely = true;
+          job.setStatus(Status.RUNNING);
+//          logger.info("Job RUNNING, " + job.getId());
         }
       } else {
         jobSubmittedRemotely = true;
       }
 
-      // status polled by status poller
-      if (job.getStatus() != after) {
-        job.setStatus(after);
+      // only set status when it is RUNNING
+      // We would set other status based on the interpret result
+      if (after == Status.RUNNING) {
+//        logger.info("Job RUNNING, " + job.getId());
+        job.setStatus(Status.RUNNING);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
index af52dec..5871ca5 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
@@ -24,17 +24,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * TODO(moon) : add description.
+ * Factory class for creating schedulers
+ *
  */
 public class SchedulerFactory implements SchedulerListener {
   private static final Logger logger = 
LoggerFactory.getLogger(SchedulerFactory.class);
-  ExecutorService executor;
-  Map<String, Scheduler> schedulers = new LinkedHashMap<>();
+  private ExecutorService executor;
+  private Map<String, Scheduler> schedulers = new LinkedHashMap<>();
 
   private static SchedulerFactory singleton;
   private static Long singletonLock = new Long(0);
@@ -54,17 +55,17 @@ public class SchedulerFactory implements SchedulerListener {
     return singleton;
   }
 
-  public SchedulerFactory() throws Exception {
-    executor = ExecutorFactory.singleton().createOrGet("schedulerFactory", 
100);
+  SchedulerFactory() throws Exception {
+    executor = ExecutorFactory.singleton().createOrGet("SchedulerFactory", 
100);
   }
 
   public void destroy() {
-    ExecutorFactory.singleton().shutdown("schedulerFactory");
+    ExecutorFactory.singleton().shutdown("SchedulerFactory");
   }
 
   public Scheduler createOrGetFIFOScheduler(String name) {
     synchronized (schedulers) {
-      if (schedulers.containsKey(name) == false) {
+      if (!schedulers.containsKey(name)) {
         Scheduler s = new FIFOScheduler(name, executor, this);
         schedulers.put(name, s);
         executor.execute(s);
@@ -75,7 +76,7 @@ public class SchedulerFactory implements SchedulerListener {
 
   public Scheduler createOrGetParallelScheduler(String name, int 
maxConcurrency) {
     synchronized (schedulers) {
-      if (schedulers.containsKey(name) == false) {
+      if (!schedulers.containsKey(name)) {
         Scheduler s = new ParallelScheduler(name, executor, this, 
maxConcurrency);
         schedulers.put(name, s);
         executor.execute(s);
@@ -86,17 +87,17 @@ public class SchedulerFactory implements SchedulerListener {
 
   public Scheduler createOrGetRemoteScheduler(
       String name,
-      String noteId,
-      RemoteInterpreterProcess interpreterProcess,
+      String sessionId,
+      RemoteInterpreter remoteInterpreter,
       int maxConcurrency) {
 
     synchronized (schedulers) {
-      if (schedulers.containsKey(name) == false) {
+      if (!schedulers.containsKey(name)) {
         Scheduler s = new RemoteScheduler(
             name,
             executor,
-            noteId,
-            interpreterProcess,
+            sessionId,
+            remoteInterpreter,
             this,
             maxConcurrency);
         schedulers.put(name, s);
@@ -106,38 +107,24 @@ public class SchedulerFactory implements 
SchedulerListener {
     }
   }
 
-  public Scheduler removeScheduler(String name) {
+  public void removeScheduler(String name) {
     synchronized (schedulers) {
       Scheduler s = schedulers.remove(name);
       if (s != null) {
         s.stop();
       }
     }
-    return null;
-  }
-
-  public Collection<Scheduler> listScheduler(String name) {
-    List<Scheduler> s = new LinkedList<>();
-    synchronized (schedulers) {
-      for (Scheduler ss : schedulers.values()) {
-        s.add(ss);
-      }
-    }
-    return s;
   }
 
   @Override
   public void jobStarted(Scheduler scheduler, Job job) {
-    logger.info("Job " + job.getJobName() + " started by scheduler " + 
scheduler.getName());
+    logger.info("Job " + job.getId() + " started by scheduler " + 
scheduler.getName());
 
   }
 
   @Override
   public void jobFinished(Scheduler scheduler, Job job) {
-    logger.info("Job " + job.getJobName() + " finished by scheduler " + 
scheduler.getName());
+    logger.info("Job " + job.getId() + " finished by scheduler " + 
scheduler.getName());
 
   }
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java
index 8673476..1926528 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java
@@ -17,7 +17,6 @@
 package org.apache.zeppelin.tabledata;
 
 import org.apache.zeppelin.resource.Resource;
-import org.apache.zeppelin.resource.ResourcePoolUtils;
 
 import java.util.Iterator;
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java
new file mode 100644
index 0000000..14c03a1
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.util;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Generate Tiny ID.
+ */
+public class IdHashes {
+  private static final char[] DICTIONARY = new char[] {'1', '2', '3', '4', 
'5', '6', '7', '8', '9',
+    'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'J', 'K', 'M', 'N', 'P', 'Q', 'R', 
'S', 'T', 'U', 'V',
+    'W', 'X', 'Y', 'Z'};
+
+  /**
+   * encodes the given string into the base of the dictionary provided in the 
constructor.
+   *
+   * @param value the number to encode.
+   * @return the encoded string.
+   */
+  private static String encode(Long value) {
+
+    List<Character> result = new ArrayList<>();
+    BigInteger base = new BigInteger("" + DICTIONARY.length);
+    int exponent = 1;
+    BigInteger remaining = new BigInteger(value.toString());
+    while (true) {
+      BigInteger a = base.pow(exponent); // 16^1 = 16
+      BigInteger b = remaining.mod(a); // 119 % 16 = 7 | 112 % 256 = 112
+      BigInteger c = base.pow(exponent - 1);
+      BigInteger d = b.divide(c);
+
+      // if d > dictionary.length, we have a problem. but BigInteger doesnt 
have
+      // a greater than method :-( hope for the best. theoretically, d is 
always
+      // an index of the dictionary!
+      result.add(DICTIONARY[d.intValue()]);
+      remaining = remaining.subtract(b); // 119 - 7 = 112 | 112 - 112 = 0
+
+      // finished?
+      if (remaining.equals(BigInteger.ZERO)) {
+        break;
+      }
+
+      exponent++;
+    }
+
+    // need to reverse it, since the start of the list contains the least 
significant values
+    StringBuffer sb = new StringBuffer();
+    for (int i = result.size() - 1; i >= 0; i--) {
+      sb.append(result.get(i));
+    }
+    return sb.toString();
+  }
+
+  public static String generateId() {
+    return encode(System.currentTimeMillis() + new Random().nextInt());
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java
new file mode 100644
index 0000000..6153f49
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.util;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * TODO(moon) : add description.
+ */
+public class Util {
+  private static final String PROJECT_PROPERTIES_VERSION_KEY = "version";
+  private static final String GIT_PROPERTIES_COMMIT_ID_KEY = 
"git.commit.id.abbrev";
+  private static final String GIT_PROPERTIES_COMMIT_TS_KEY = "git.commit.time";
+
+  private static Properties projectProperties;
+  private static Properties gitProperties;
+
+  static {
+    projectProperties = new Properties();
+    gitProperties = new Properties();
+    try {
+      
projectProperties.load(Util.class.getResourceAsStream("/project.properties"));
+      gitProperties.load(Util.class.getResourceAsStream("/git.properties"));
+    } catch (IOException e) {
+      //Fail to read project.properties
+    }
+  }
+
+  /**
+   * Get Zeppelin version
+   *
+   * @return Current Zeppelin version
+   */
+  public static String getVersion() {
+    return 
StringUtils.defaultIfEmpty(projectProperties.getProperty(PROJECT_PROPERTIES_VERSION_KEY),
+            StringUtils.EMPTY);
+  }
+
+  /**
+   * Get Zeppelin Git latest commit id
+   *
+   * @return Latest Zeppelin commit id
+   */
+  public static String getGitCommitId() {
+    return 
StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_ID_KEY),
+            StringUtils.EMPTY);
+  }
+
+  /**
+   * Get Zeppelin Git latest commit timestamp
+   *
+   * @return Latest Zeppelin commit timestamp
+   */
+  public static String getGitTimestamp() {
+    return 
StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_TS_KEY),
+            StringUtils.EMPTY);
+  }
+}

Reply via email to