http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
new file mode 100644
index 0000000..95235e5
--- /dev/null
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
@@ -0,0 +1,975 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.thrift.transport.TTransportException;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterEnv;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterB;
+import org.apache.zeppelin.resource.LocalResourcePool;
+import org.apache.zeppelin.scheduler.Job;
+import org.apache.zeppelin.scheduler.Job.Status;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+public class RemoteInterpreterTest {
+
+
+  private static final String INTERPRETER_SCRIPT =
+          System.getProperty("os.name").startsWith("Windows") ?
+                  "../bin/interpreter.cmd" :
+                  "../bin/interpreter.sh";
+
+  private InterpreterGroup intpGroup;
+  private HashMap<String, String> env;
+
+  @Before
+  public void setUp() throws Exception {
+    intpGroup = new InterpreterGroup();
+    env = new HashMap<>();
+    env.put("ZEPPELIN_CLASSPATH", new 
File("./target/test-classes").getAbsolutePath());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    intpGroup.close();
+  }
+
+  private RemoteInterpreter createMockInterpreterA(Properties p) {
+    return createMockInterpreterA(p, "note");
+  }
+
+  private RemoteInterpreter createMockInterpreterA(Properties p, String 
noteId) {
+    return new RemoteInterpreter(
+        p,
+        noteId,
+        MockInterpreterA.class.getName(),
+        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+        "fake",
+        "fakeRepo",
+        env,
+        10 * 1000,
+        null,
+        null,
+        "anonymous",
+        false);
+  }
+
+  private RemoteInterpreter createMockInterpreterB(Properties p) {
+    return createMockInterpreterB(p, "note");
+  }
+
+  private RemoteInterpreter createMockInterpreterB(Properties p, String 
noteId) {
+    return new RemoteInterpreter(
+        p,
+        noteId,
+        MockInterpreterB.class.getName(),
+        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+        "fake",
+        "fakeRepo",
+        env,
+        10 * 1000,
+        null,
+        null,
+        "anonymous",
+        false);
+  }
+
+  @Test
+  public void testRemoteInterperterCall() throws TTransportException, 
IOException {
+    Properties p = new Properties();
+    intpGroup.put("note", new LinkedList<Interpreter>());
+
+    RemoteInterpreter intpA = createMockInterpreterA(p);
+
+    intpGroup.get("note").add(intpA);
+
+    intpA.setInterpreterGroup(intpGroup);
+
+    RemoteInterpreter intpB = createMockInterpreterB(p);
+
+    intpGroup.get("note").add(intpB);
+    intpB.setInterpreterGroup(intpGroup);
+
+
+    RemoteInterpreterProcess process = intpA.getInterpreterProcess();
+    process.equals(intpB.getInterpreterProcess());
+
+    assertFalse(process.isRunning());
+    assertEquals(0, process.getNumIdleClient());
+    assertEquals(0, process.referenceCount());
+
+    intpA.open(); // initializa all interpreters in the same group
+    assertTrue(process.isRunning());
+    assertEquals(1, process.getNumIdleClient());
+    assertEquals(1, process.referenceCount());
+
+    intpA.interpret("1",
+        new InterpreterContext(
+            "note",
+            "id",
+            null,
+            "title",
+            "text",
+            new AuthenticationInfo(),
+            new HashMap<String, Object>(),
+            new GUI(),
+            new AngularObjectRegistry(intpGroup.getId(), null),
+            new LocalResourcePool("pool1"),
+            new LinkedList<InterpreterContextRunner>(), null));
+
+    intpB.open();
+    assertEquals(1, process.referenceCount());
+
+    intpA.close();
+    assertEquals(0, process.referenceCount());
+    intpB.close();
+    assertEquals(0, process.referenceCount());
+
+    assertFalse(process.isRunning());
+
+  }
+
+  @Test
+  public void testExecuteIncorrectPrecode() throws TTransportException, 
IOException {
+    Properties p = new Properties();
+    p.put("zeppelin.MockInterpreterA.precode", "fail test");
+    intpGroup.put("note", new LinkedList<Interpreter>());
+
+    RemoteInterpreter intpA = createMockInterpreterA(p);
+
+    intpGroup.get("note").add(intpA);
+
+    intpA.setInterpreterGroup(intpGroup);
+
+    RemoteInterpreterProcess process = intpA.getInterpreterProcess();
+
+    intpA.open();
+    
+    InterpreterResult result = intpA.interpret("1",
+        new InterpreterContext(
+            "note",
+            "id",
+            null,
+            "title",
+            "text",
+            new AuthenticationInfo(),
+            new HashMap<String, Object>(),
+            new GUI(),
+            new AngularObjectRegistry(intpGroup.getId(), null),
+            new LocalResourcePool("pool1"),
+            new LinkedList<InterpreterContextRunner>(), null));
+
+
+
+    intpA.close();
+    assertEquals(Code.ERROR, result.code());
+  }
+
+  @Test
+  public void testExecuteCorrectPrecode() throws TTransportException, 
IOException {
+    Properties p = new Properties();
+    p.put("zeppelin.MockInterpreterA.precode", "2");
+    intpGroup.put("note", new LinkedList<Interpreter>());
+
+    RemoteInterpreter intpA = createMockInterpreterA(p);
+
+    intpGroup.get("note").add(intpA);
+
+    intpA.setInterpreterGroup(intpGroup);
+
+    RemoteInterpreterProcess process = intpA.getInterpreterProcess();
+
+    intpA.open();
+
+    InterpreterResult result = intpA.interpret("1",
+        new InterpreterContext(
+            "note",
+            "id",
+            null,
+            "title",
+            "text",
+            new AuthenticationInfo(),
+            new HashMap<String, Object>(),
+            new GUI(),
+            new AngularObjectRegistry(intpGroup.getId(), null),
+            new LocalResourcePool("pool1"),
+            new LinkedList<InterpreterContextRunner>(), null));
+
+
+
+    intpA.close();
+    assertEquals(Code.SUCCESS, result.code());
+    assertEquals("1", result.message().get(0).getData());
+  }
+
+  @Test
+  public void testRemoteInterperterErrorStatus() throws TTransportException, 
IOException {
+    Properties p = new Properties();
+
+    RemoteInterpreter intpA = createMockInterpreterA(p);
+
+    intpGroup.put("note", new LinkedList<Interpreter>());
+    intpGroup.get("note").add(intpA);
+    intpA.setInterpreterGroup(intpGroup);
+
+    intpA.open();
+    InterpreterResult ret = intpA.interpret("non numeric value",
+        new InterpreterContext(
+            "noteId",
+            "id",
+            null,
+            "title",
+            "text",
+            new AuthenticationInfo(),
+            new HashMap<String, Object>(),
+            new GUI(),
+            new AngularObjectRegistry(intpGroup.getId(), null),
+            new LocalResourcePool("pool1"),
+            new LinkedList<InterpreterContextRunner>(), null));
+
+    assertEquals(Code.ERROR, ret.code());
+  }
+
+  @Test
+  public void testRemoteSchedulerSharing() throws TTransportException, 
IOException {
+    Properties p = new Properties();
+    intpGroup.put("note", new LinkedList<Interpreter>());
+
+    RemoteInterpreter intpA = new RemoteInterpreter(
+        p,
+        "note",
+        MockInterpreterA.class.getName(),
+        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+        "fake",
+        "fakeRepo",
+        env,
+        10 * 1000,
+        null,
+        null,
+        "anonymous",
+        false);
+
+    intpGroup.get("note").add(intpA);
+    intpA.setInterpreterGroup(intpGroup);
+
+    RemoteInterpreter intpB = new RemoteInterpreter(
+        p,
+        "note",
+        MockInterpreterB.class.getName(),
+        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+        "fake",
+        "fakeRepo",
+        env,
+        10 * 1000,
+        null,
+        null,
+        "anonymous",
+        false);
+
+    intpGroup.get("note").add(intpB);
+    intpB.setInterpreterGroup(intpGroup);
+
+    intpA.open();
+    intpB.open();
+
+    long start = System.currentTimeMillis();
+    InterpreterResult ret = intpA.interpret("500",
+        new InterpreterContext(
+            "note",
+            "id",
+            null,
+            "title",
+            "text",
+            new AuthenticationInfo(),
+            new HashMap<String, Object>(),
+            new GUI(),
+            new AngularObjectRegistry(intpGroup.getId(), null),
+            new LocalResourcePool("pool1"),
+            new LinkedList<InterpreterContextRunner>(), null));
+    assertEquals("500", ret.message().get(0).getData());
+
+    ret = intpB.interpret("500",
+        new InterpreterContext(
+            "note",
+            "id",
+            null,
+            "title",
+            "text",
+            new AuthenticationInfo(),
+            new HashMap<String, Object>(),
+            new GUI(),
+            new AngularObjectRegistry(intpGroup.getId(), null),
+            new LocalResourcePool("pool1"),
+            new LinkedList<InterpreterContextRunner>(), null));
+    assertEquals("1000", ret.message().get(0).getData());
+    long end = System.currentTimeMillis();
+    assertTrue(end - start >= 1000);
+
+
+    intpA.close();
+    intpB.close();
+  }
+
+  @Test
+  public void testRemoteSchedulerSharingSubmit() throws TTransportException, 
IOException, InterruptedException {
+    Properties p = new Properties();
+    intpGroup.put("note", new LinkedList<Interpreter>());
+
+    final RemoteInterpreter intpA = createMockInterpreterA(p);
+
+    intpGroup.get("note").add(intpA);
+    intpA.setInterpreterGroup(intpGroup);
+
+    final RemoteInterpreter intpB = createMockInterpreterB(p);
+
+    intpGroup.get("note").add(intpB);
+    intpB.setInterpreterGroup(intpGroup);
+
+    intpA.open();
+    intpB.open();
+
+    long start = System.currentTimeMillis();
+    Job jobA = new Job("jobA", null) {
+      private Object r;
+
+      @Override
+      public Object getReturn() {
+        return r;
+      }
+
+      @Override
+      public void setResult(Object results) {
+        this.r = results;
+      }
+
+      @Override
+      public int progress() {
+        return 0;
+      }
+
+      @Override
+      public Map<String, Object> info() {
+        return null;
+      }
+
+      @Override
+      protected Object jobRun() throws Throwable {
+        return intpA.interpret("500",
+            new InterpreterContext(
+                "note",
+                "jobA",
+                null,
+                "title",
+                "text",
+                new AuthenticationInfo(),
+                new HashMap<String, Object>(),
+                new GUI(),
+                new AngularObjectRegistry(intpGroup.getId(), null),
+                new LocalResourcePool("pool1"),
+                new LinkedList<InterpreterContextRunner>(), null));
+      }
+
+      @Override
+      protected boolean jobAbort() {
+        return false;
+      }
+
+    };
+    intpA.getScheduler().submit(jobA);
+
+    Job jobB = new Job("jobB", null) {
+
+      private Object r;
+
+      @Override
+      public Object getReturn() {
+        return r;
+      }
+
+      @Override
+      public void setResult(Object results) {
+        this.r = results;
+      }
+
+      @Override
+      public int progress() {
+        return 0;
+      }
+
+      @Override
+      public Map<String, Object> info() {
+        return null;
+      }
+
+      @Override
+      protected Object jobRun() throws Throwable {
+        return intpB.interpret("500",
+            new InterpreterContext(
+                "note",
+                "jobB",
+                null,
+                "title",
+                "text",
+                new AuthenticationInfo(),
+                new HashMap<String, Object>(),
+                new GUI(),
+                new AngularObjectRegistry(intpGroup.getId(), null),
+                new LocalResourcePool("pool1"),
+                new LinkedList<InterpreterContextRunner>(), null));
+      }
+
+      @Override
+      protected boolean jobAbort() {
+        return false;
+      }
+
+    };
+    intpB.getScheduler().submit(jobB);
+    // wait until both job finished
+    while (jobA.getStatus() != Status.FINISHED ||
+           jobB.getStatus() != Status.FINISHED) {
+      Thread.sleep(100);
+    }
+    long end = System.currentTimeMillis();
+    assertTrue(end - start >= 1000);
+
+    assertEquals("1000", ((InterpreterResult) 
jobB.getReturn()).message().get(0).getData());
+
+    intpA.close();
+    intpB.close();
+  }
+
+  @Test
+  public void testRunOrderPreserved() throws InterruptedException {
+    Properties p = new Properties();
+    intpGroup.put("note", new LinkedList<Interpreter>());
+
+    final RemoteInterpreter intpA = createMockInterpreterA(p);
+
+    intpGroup.get("note").add(intpA);
+    intpA.setInterpreterGroup(intpGroup);
+
+    intpA.open();
+
+    int concurrency = 3;
+    final List<InterpreterResultMessage> results = new LinkedList<>();
+
+    Scheduler scheduler = intpA.getScheduler();
+    for (int i = 0; i < concurrency; i++) {
+      final String jobId = Integer.toString(i);
+      scheduler.submit(new Job(jobId, Integer.toString(i), null, 200) {
+        private Object r;
+
+        @Override
+        public Object getReturn() {
+          return r;
+        }
+
+        @Override
+        public void setResult(Object results) {
+          this.r = results;
+        }
+
+        @Override
+        public int progress() {
+          return 0;
+        }
+
+        @Override
+        public Map<String, Object> info() {
+          return null;
+        }
+
+        @Override
+        protected Object jobRun() throws Throwable {
+          InterpreterResult ret = intpA.interpret(getJobName(), new 
InterpreterContext(
+              "note",
+              jobId,
+              null,
+              "title",
+              "text",
+              new AuthenticationInfo(),
+              new HashMap<String, Object>(),
+              new GUI(),
+              new AngularObjectRegistry(intpGroup.getId(), null),
+              new LocalResourcePool("pool1"),
+              new LinkedList<InterpreterContextRunner>(), null));
+
+          synchronized (results) {
+            results.addAll(ret.message());
+            results.notify();
+          }
+          return null;
+        }
+
+        @Override
+        protected boolean jobAbort() {
+          return false;
+        }
+
+      });
+    }
+
+    // wait for job finished
+    synchronized (results) {
+      while (results.size() != concurrency) {
+        results.wait(300);
+      }
+    }
+
+    int i = 0;
+    for (InterpreterResultMessage result : results) {
+      assertEquals(Integer.toString(i++), result.getData());
+    }
+    assertEquals(concurrency, i);
+
+    intpA.close();
+  }
+
+
+  @Test
+  public void testRunParallel() throws InterruptedException {
+    Properties p = new Properties();
+    p.put("parallel", "true");
+    intpGroup.put("note", new LinkedList<Interpreter>());
+
+    final RemoteInterpreter intpA = createMockInterpreterA(p);
+
+    intpGroup.get("note").add(intpA);
+    intpA.setInterpreterGroup(intpGroup);
+
+    intpA.open();
+
+    int concurrency = 4;
+    final int timeToSleep = 1000;
+    final List<InterpreterResultMessage> results = new LinkedList<>();
+    long start = System.currentTimeMillis();
+
+    Scheduler scheduler = intpA.getScheduler();
+    for (int i = 0; i < concurrency; i++) {
+      final String jobId = Integer.toString(i);
+      scheduler.submit(new Job(jobId, Integer.toString(i), null, 300) {
+        private Object r;
+
+        @Override
+        public Object getReturn() {
+          return r;
+        }
+
+        @Override
+        public void setResult(Object results) {
+          this.r = results;
+        }
+
+        @Override
+        public int progress() {
+          return 0;
+        }
+
+        @Override
+        public Map<String, Object> info() {
+          return null;
+        }
+
+        @Override
+        protected Object jobRun() throws Throwable {
+          String stmt = Integer.toString(timeToSleep);
+          InterpreterResult ret = intpA.interpret(stmt, new InterpreterContext(
+              "note",
+              jobId,
+              null,
+              "title",
+              "text",
+              new AuthenticationInfo(),
+              new HashMap<String, Object>(),
+              new GUI(),
+              new AngularObjectRegistry(intpGroup.getId(), null),
+              new LocalResourcePool("pool1"),
+              new LinkedList<InterpreterContextRunner>(), null));
+
+          synchronized (results) {
+            results.addAll(ret.message());
+            results.notify();
+          }
+          return stmt;
+        }
+
+        @Override
+        protected boolean jobAbort() {
+          return false;
+        }
+
+      });
+    }
+
+    // wait for job finished
+    synchronized (results) {
+      while (results.size() != concurrency) {
+        results.wait(300);
+      }
+    }
+
+    long end = System.currentTimeMillis();
+
+    assertTrue(end - start < timeToSleep * concurrency);
+
+    intpA.close();
+  }
+
+  @Test
+  public void testInterpreterGroupResetBeforeProcessStarts() {
+    Properties p = new Properties();
+
+    RemoteInterpreter intpA = createMockInterpreterA(p);
+
+    intpA.setInterpreterGroup(intpGroup);
+    RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
+
+    intpA.setInterpreterGroup(new 
InterpreterGroup(intpA.getInterpreterGroup().getId()));
+    RemoteInterpreterProcess processB = intpA.getInterpreterProcess();
+
+    assertNotSame(processA.hashCode(), processB.hashCode());
+  }
+
+  @Test
+  public void testInterpreterGroupResetAfterProcessFinished() {
+    Properties p = new Properties();
+    intpGroup.put("note", new LinkedList<Interpreter>());
+
+    RemoteInterpreter intpA = createMockInterpreterA(p);
+
+    intpA.setInterpreterGroup(intpGroup);
+    RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
+    intpA.open();
+
+    processA.dereference();    // intpA.close();
+
+    intpA.setInterpreterGroup(new 
InterpreterGroup(intpA.getInterpreterGroup().getId()));
+    RemoteInterpreterProcess processB = intpA.getInterpreterProcess();
+
+    assertNotSame(processA.hashCode(), processB.hashCode());
+  }
+
+  @Test
+  public void testInterpreterGroupResetDuringProcessRunning() throws 
InterruptedException {
+    Properties p = new Properties();
+    intpGroup.put("note", new LinkedList<Interpreter>());
+
+    final RemoteInterpreter intpA = createMockInterpreterA(p);
+
+    intpGroup.get("note").add(intpA);
+    intpA.setInterpreterGroup(intpGroup);
+
+    intpA.open();
+
+    Job jobA = new Job("jobA", null) {
+      private Object r;
+
+      @Override
+      public Object getReturn() {
+        return r;
+      }
+
+      @Override
+      public void setResult(Object results) {
+        this.r = results;
+      }
+
+      @Override
+      public int progress() {
+        return 0;
+      }
+
+      @Override
+      public Map<String, Object> info() {
+        return null;
+      }
+
+      @Override
+      protected Object jobRun() throws Throwable {
+        return intpA.interpret("2000",
+            new InterpreterContext(
+                "note",
+                "jobA",
+                null,
+                "title",
+                "text",
+                new AuthenticationInfo(),
+                new HashMap<String, Object>(),
+                new GUI(),
+                new AngularObjectRegistry(intpGroup.getId(), null),
+                new LocalResourcePool("pool1"),
+                new LinkedList<InterpreterContextRunner>(), null));
+      }
+
+      @Override
+      protected boolean jobAbort() {
+        return false;
+      }
+
+    };
+    intpA.getScheduler().submit(jobA);
+
+    // wait for job started
+    while (intpA.getScheduler().getJobsRunning().size() == 0) {
+      Thread.sleep(100);
+    }
+
+    // restart interpreter
+    RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
+    intpA.close();
+
+    InterpreterGroup newInterpreterGroup =
+        new InterpreterGroup(intpA.getInterpreterGroup().getId());
+    newInterpreterGroup.put("note", new LinkedList<Interpreter>());
+
+    intpA.setInterpreterGroup(newInterpreterGroup);
+    intpA.open();
+    RemoteInterpreterProcess processB = intpA.getInterpreterProcess();
+
+    assertNotSame(processA.hashCode(), processB.hashCode());
+
+  }
+
+  @Test
+  public void 
testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() {
+    Properties p = new Properties();
+    intpGroup.put("note", new LinkedList<Interpreter>());
+
+    RemoteInterpreter intpA = createMockInterpreterA(p);
+
+    intpGroup.get("note").add(intpA);
+    intpA.setInterpreterGroup(intpGroup);
+
+    RemoteInterpreter intpB = createMockInterpreterB(p);
+
+    intpGroup.get("note").add(intpB);
+    intpB.setInterpreterGroup(intpGroup);
+
+    intpA.open();
+    intpB.open();
+
+    assertEquals(intpA.getScheduler(), intpB.getScheduler());
+  }
+
+  @Test
+  public void testMultiInterpreterSession() {
+    Properties p = new Properties();
+    intpGroup.put("sessionA", new LinkedList<Interpreter>());
+    intpGroup.put("sessionB", new LinkedList<Interpreter>());
+
+    RemoteInterpreter intpAsessionA = createMockInterpreterA(p, "sessionA");
+    intpGroup.get("sessionA").add(intpAsessionA);
+    intpAsessionA.setInterpreterGroup(intpGroup);
+
+    RemoteInterpreter intpBsessionA = createMockInterpreterB(p, "sessionA");
+    intpGroup.get("sessionA").add(intpBsessionA);
+    intpBsessionA.setInterpreterGroup(intpGroup);
+
+    intpAsessionA.open();
+    intpBsessionA.open();
+
+    assertEquals(intpAsessionA.getScheduler(), intpBsessionA.getScheduler());
+
+    RemoteInterpreter intpAsessionB = createMockInterpreterA(p, "sessionB");
+    intpGroup.get("sessionB").add(intpAsessionB);
+    intpAsessionB.setInterpreterGroup(intpGroup);
+
+    RemoteInterpreter intpBsessionB = createMockInterpreterB(p, "sessionB");
+    intpGroup.get("sessionB").add(intpBsessionB);
+    intpBsessionB.setInterpreterGroup(intpGroup);
+
+    intpAsessionB.open();
+    intpBsessionB.open();
+
+    assertEquals(intpAsessionB.getScheduler(), intpBsessionB.getScheduler());
+    assertNotEquals(intpAsessionA.getScheduler(), 
intpAsessionB.getScheduler());
+  }
+
+  @Test
+  public void should_push_local_angular_repo_to_remote() throws Exception {
+    //Given
+    final Client client = Mockito.mock(Client.class);
+    final RemoteInterpreter intr = new RemoteInterpreter(new Properties(), 
"noteId",
+        MockInterpreterA.class.getName(), "runner", "path", "localRepo", env, 
10 * 1000, null,
+        null, "anonymous", false);
+    final AngularObjectRegistry registry = new AngularObjectRegistry("spark", 
null);
+    registry.add("name", "DuyHai DOAN", "nodeId", "paragraphId");
+    final InterpreterGroup interpreterGroup = new InterpreterGroup("groupId");
+    interpreterGroup.setAngularObjectRegistry(registry);
+    intr.setInterpreterGroup(interpreterGroup);
+
+    final java.lang.reflect.Type registryType = new TypeToken<Map<String,
+                Map<String, AngularObject>>>() {}.getType();
+    final Gson gson = new Gson();
+    final String expected = gson.toJson(registry.getRegistry(), registryType);
+
+    //When
+    intr.pushAngularObjectRegistryToRemote(client);
+
+    //Then
+    Mockito.verify(client).angularRegistryPush(expected);
+  }
+
+  @Test
+  public void testEnvStringPattern() {
+    assertFalse(RemoteInterpreterUtils.isEnvString(null));
+    assertFalse(RemoteInterpreterUtils.isEnvString(""));
+    assertFalse(RemoteInterpreterUtils.isEnvString("abcDEF"));
+    assertFalse(RemoteInterpreterUtils.isEnvString("ABC-DEF"));
+    assertTrue(RemoteInterpreterUtils.isEnvString("ABCDEF"));
+    assertTrue(RemoteInterpreterUtils.isEnvString("ABC_DEF"));
+    assertTrue(RemoteInterpreterUtils.isEnvString("ABC_DEF123"));
+  }
+
+  @Test
+  public void testEnvronmentAndPropertySet() {
+    Properties p = new Properties();
+    p.setProperty("MY_ENV1", "env value 1");
+    p.setProperty("my.property.1", "property value 1");
+
+    RemoteInterpreter intp = new RemoteInterpreter(
+        p,
+        "note",
+        MockInterpreterEnv.class.getName(),
+        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+        "fake",
+        "fakeRepo",
+        env,
+        10 * 1000,
+        null,
+        null,
+        "anonymous",
+        false);
+
+    intpGroup.put("note", new LinkedList<Interpreter>());
+    intpGroup.get("note").add(intp);
+    intp.setInterpreterGroup(intpGroup);
+
+    intp.open();
+
+    InterpreterContext context = new InterpreterContext(
+        "noteId",
+        "id",
+        null,
+        "title",
+        "text",
+        new AuthenticationInfo(),
+        new HashMap<String, Object>(),
+        new GUI(),
+        new AngularObjectRegistry(intpGroup.getId(), null),
+        new LocalResourcePool("pool1"),
+        new LinkedList<InterpreterContextRunner>(), null);
+
+
+    assertEquals("env value 1", intp.interpret("getEnv MY_ENV1", 
context).message().get(0).getData());
+    assertEquals(Code.ERROR, intp.interpret("getProperty MY_ENV1", 
context).code());
+    assertEquals(Code.ERROR, intp.interpret("getEnv my.property.1", 
context).code());
+    assertEquals("property value 1", intp.interpret("getProperty 
my.property.1", context).message().get(0).getData());
+
+    intp.close();
+  }
+
+  @Test
+  public void testSetProgress() throws InterruptedException {
+    // given MockInterpreterA set progress through InterpreterContext
+    Properties p = new Properties();
+    p.setProperty("progress", "50");
+    final RemoteInterpreter intpA = createMockInterpreterA(p);
+
+    intpGroup.put("note", new LinkedList<Interpreter>());
+    intpGroup.get("note").add(intpA);
+    intpA.setInterpreterGroup(intpGroup);
+
+    intpA.open();
+
+    final InterpreterContext context1 = new InterpreterContext(
+        "noteId",
+        "id1",
+        null,
+        "title",
+        "text",
+        new AuthenticationInfo(),
+        new HashMap<String, Object>(),
+        new GUI(),
+        new AngularObjectRegistry(intpGroup.getId(), null),
+        new LocalResourcePool("pool1"),
+        new LinkedList<InterpreterContextRunner>(), null);
+
+    InterpreterContext context2 = new InterpreterContext(
+        "noteId",
+        "id2",
+        null,
+        "title",
+        "text",
+        new AuthenticationInfo(),
+        new HashMap<String, Object>(),
+        new GUI(),
+        new AngularObjectRegistry(intpGroup.getId(), null),
+        new LocalResourcePool("pool1"),
+        new LinkedList<InterpreterContextRunner>(), null);
+
+
+    assertEquals(0, intpA.getProgress(context1));
+    assertEquals(0, intpA.getProgress(context2));
+
+    // when interpreter update progress through InterpreterContext
+    Thread t = new Thread() {
+      public void run() {
+        InterpreterResult ret = intpA.interpret("1000", context1);
+      }
+    };
+    t.start();
+
+    // then progress need to be updated in given context
+    while(intpA.getProgress(context1) == 0) Thread.yield();
+    assertEquals(50, intpA.getProgress(context1));
+    assertEquals(0, intpA.getProgress(context2));
+
+    t.join();
+    assertEquals(0, intpA.getProgress(context1));
+    assertEquals(0, intpA.getProgress(context2));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
new file mode 100644
index 0000000..975d6ea
--- /dev/null
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.junit.Test;
+
+public class RemoteInterpreterUtilsTest {
+
+  @Test
+  public void testFindRandomAvailablePortOnAllLocalInterfaces() throws 
IOException {
+    
assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() 
> 0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java
new file mode 100644
index 0000000..50d9888
--- /dev/null
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+public class MockInterpreterA extends Interpreter {
+
+  private String lastSt;
+
+  public MockInterpreterA(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() {
+    //new RuntimeException().printStackTrace();
+  }
+
+  @Override
+  public void close() {
+  }
+
+  public String getLastStatement() {
+    return lastSt;
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    if (property.containsKey("progress")) {
+      context.setProgress(Integer.parseInt(getProperty("progress")));
+    }
+    try {
+      Thread.sleep(Long.parseLong(st));
+      this.lastSt = st;
+    } catch (NumberFormatException | InterruptedException e) {
+      throw new InterpreterException(e);
+    }
+    return new InterpreterResult(Code.SUCCESS, st);
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+
+  }
+
+  @Override
+  public FormType getFormType() {
+    return FormType.NATIVE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return 0;
+  }
+
+  @Override
+  public List<InterpreterCompletion> completion(String buf, int cursor,
+      InterpreterContext interpreterContext) {
+    return null;
+  }
+
+  @Override
+  public Scheduler getScheduler() {
+    if (getProperty("parallel") != null && 
getProperty("parallel").equals("true")) {
+      return 
SchedulerFactory.singleton().createOrGetParallelScheduler("interpreter_" + 
this.hashCode(), 10);
+    } else {
+      return 
SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + 
this.hashCode());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
new file mode 100644
index 0000000..d4b26ad
--- /dev/null
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectWatcher;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+
+public class MockInterpreterAngular extends Interpreter {
+
+  AtomicInteger numWatch = new AtomicInteger(0);
+
+  public MockInterpreterAngular(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() {
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    String[] stmt = st.split(" ");
+    String cmd = stmt[0];
+    String name = null;
+    if (stmt.length >= 2) {
+      name = stmt[1];
+    }
+    String value = null;
+    if (stmt.length == 3) {
+      value = stmt[2];
+    }
+
+    AngularObjectRegistry registry = context.getAngularObjectRegistry();
+
+    if (cmd.equals("add")) {
+      registry.add(name, value, context.getNoteId(), null);
+      registry.get(name, context.getNoteId(), null).addWatcher(new 
AngularObjectWatcher
+              (null) {
+
+        @Override
+        public void watch(Object oldObject, Object newObject,
+            InterpreterContext context) {
+          numWatch.incrementAndGet();
+        }
+
+      });
+    } else if (cmd.equalsIgnoreCase("update")) {
+      registry.get(name, context.getNoteId(), null).set(value);
+    } else if (cmd.equals("remove")) {
+      registry.remove(name, context.getNoteId(), null);
+    }
+
+    try {
+      Thread.sleep(500); // wait for watcher executed
+    } catch (InterruptedException e) {
+      logger.error("Exception in MockInterpreterAngular while interpret 
Thread.sleep", e);
+    }
+
+    String msg = registry.getAll(context.getNoteId(), null).size() + " " + 
Integer.toString(numWatch
+            .get());
+    return new InterpreterResult(Code.SUCCESS, msg);
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+  }
+
+  @Override
+  public FormType getFormType() {
+    return FormType.NATIVE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return 0;
+  }
+
+  @Override
+  public List<InterpreterCompletion> completion(String buf, int cursor,
+      InterpreterContext interpreterContext) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
new file mode 100644
index 0000000..7103335
--- /dev/null
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.WrappedInterpreter;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.scheduler.Scheduler;
+
+public class MockInterpreterB extends Interpreter {
+
+  public MockInterpreterB(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() {
+    //new RuntimeException().printStackTrace();
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    MockInterpreterA intpA = getInterpreterA();
+    String intpASt = intpA.getLastStatement();
+    long timeToSleep = Long.parseLong(st);
+    if (intpASt != null) {
+      timeToSleep += Long.parseLong(intpASt);
+    }
+    try {
+      Thread.sleep(timeToSleep);
+    } catch (NumberFormatException | InterruptedException e) {
+      throw new InterpreterException(e);
+    }
+    return new InterpreterResult(Code.SUCCESS, Long.toString(timeToSleep));
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+
+  }
+
+  @Override
+  public FormType getFormType() {
+    return FormType.NATIVE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return 0;
+  }
+
+  @Override
+  public List<InterpreterCompletion> completion(String buf, int cursor,
+      InterpreterContext interpreterContext) {
+    return null;
+  }
+
+  public MockInterpreterA getInterpreterA() {
+    InterpreterGroup interpreterGroup = getInterpreterGroup();
+    synchronized (interpreterGroup) {
+      for (List<Interpreter> interpreters : interpreterGroup.values()) {
+        boolean belongsToSameNoteGroup = false;
+        MockInterpreterA a = null;
+        for (Interpreter intp : interpreters) {
+          if (intp.getClassName().equals(MockInterpreterA.class.getName())) {
+            Interpreter p = intp;
+            while (p instanceof WrappedInterpreter) {
+              p = ((WrappedInterpreter) p).getInnerInterpreter();
+            }
+            a = (MockInterpreterA) p;
+          }
+
+          Interpreter p = intp;
+          while (p instanceof WrappedInterpreter) {
+            p = ((WrappedInterpreter) p).getInnerInterpreter();
+          }
+          if (this == p) {
+            belongsToSameNoteGroup = true;
+          }
+        }
+        if (belongsToSameNoteGroup) {
+          return a;
+        }
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public Scheduler getScheduler() {
+    MockInterpreterA intpA = getInterpreterA();
+    if (intpA != null) {
+      return intpA.getScheduler();
+    }
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java
new file mode 100644
index 0000000..12e11f7
--- /dev/null
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+import java.util.List;
+import java.util.Properties;
+
+
+public class MockInterpreterEnv extends Interpreter {
+
+  public MockInterpreterEnv(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() {
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    String[] cmd = st.split(" ");
+    if (cmd[0].equals("getEnv")) {
+      return new InterpreterResult(InterpreterResult.Code.SUCCESS, 
System.getenv(cmd[1]));
+    } else if (cmd[0].equals("getProperty")){
+      return new InterpreterResult(InterpreterResult.Code.SUCCESS, 
System.getProperty(cmd[1]));
+    } else {
+      return new InterpreterResult(InterpreterResult.Code.ERROR, cmd[0]);
+    }
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+
+  }
+
+  @Override
+  public FormType getFormType() {
+    return FormType.NATIVE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return 0;
+  }
+
+  @Override
+  public List<InterpreterCompletion> completion(String buf, int cursor,
+      InterpreterContext interpreterContext) {
+    return null;
+  }
+
+  @Override
+  public Scheduler getScheduler() {
+    return 
SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + 
this.hashCode());
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
new file mode 100644
index 0000000..349315c
--- /dev/null
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * MockInterpreter to test outputstream
+ */
+public class MockInterpreterOutputStream extends Interpreter {
+  private String lastSt;
+
+  public MockInterpreterOutputStream(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() {
+    //new RuntimeException().printStackTrace();
+  }
+
+  @Override
+  public void close() {
+  }
+
+  public String getLastStatement() {
+    return lastSt;
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    String[] ret = st.split(":");
+    try {
+      if (ret[1] != null) {
+        context.out.write(ret[1]);
+      }
+    } catch (IOException e) {
+      throw new InterpreterException(e);
+    }
+    return new InterpreterResult(InterpreterResult.Code.valueOf(ret[0]), 
(ret.length > 2) ?
+            ret[2] : "");
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+
+  }
+
+  @Override
+  public FormType getFormType() {
+    return FormType.NATIVE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return 0;
+  }
+
+  @Override
+  public List<InterpreterCompletion> completion(String buf, int cursor,
+      InterpreterContext interpreterContext) {
+    return null;
+  }
+
+  @Override
+  public Scheduler getScheduler() {
+    return 
SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + 
this.hashCode());
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
new file mode 100644
index 0000000..c4ff6ab
--- /dev/null
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.gson.Gson;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectWatcher;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.resource.Resource;
+import org.apache.zeppelin.resource.ResourcePool;
+
+public class MockInterpreterResourcePool extends Interpreter {
+
+  AtomicInteger numWatch = new AtomicInteger(0);
+
+  public MockInterpreterResourcePool(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() {
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    String[] stmt = st.split(" ");
+    String cmd = stmt[0];
+    String noteId = null;
+    String paragraphId = null;
+    String name = null;
+    if (stmt.length >= 2) {
+      String[] npn = stmt[1].split(":");
+      if (npn.length >= 3) {
+        noteId = npn[0];
+        paragraphId = npn[1];
+        name = npn[2];
+      } else {
+        name = stmt[1];
+      }
+    }
+    String value = null;
+    if (stmt.length >= 3) {
+      value = stmt[2];
+    }
+
+    ResourcePool resourcePool = context.getResourcePool();
+    Object ret = null;
+    if (cmd.equals("put")) {
+      resourcePool.put(noteId, paragraphId, name, value);
+    } else if (cmd.equalsIgnoreCase("get")) {
+      Resource resource = resourcePool.get(noteId, paragraphId, name);
+      if (resource != null) {
+        ret = resourcePool.get(noteId, paragraphId, name).get();
+      } else {
+        ret = "";
+      }
+    } else if (cmd.equals("remove")) {
+      ret = resourcePool.remove(noteId, paragraphId, name);
+    } else if (cmd.equals("getAll")) {
+      ret = resourcePool.getAll();
+    } else if (cmd.equals("invoke")) {
+      Resource resource = resourcePool.get(noteId, paragraphId, name);
+      if (stmt.length >=4) {
+        Resource res = resource.invokeMethod(value, null, null, stmt[3]);
+        ret = res.get();
+      } else {
+        ret = resource.invokeMethod(value, null, null);
+      }
+    }
+
+    try {
+      Thread.sleep(500); // wait for watcher executed
+    } catch (InterruptedException e) {
+    }
+
+    Gson gson = new Gson();
+    return new InterpreterResult(Code.SUCCESS, gson.toJson(ret));
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+  }
+
+  @Override
+  public FormType getFormType() {
+    return FormType.NATIVE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return 0;
+  }
+
+  @Override
+  public List<InterpreterCompletion> completion(String buf, int cursor,
+      InterpreterContext interpreterContext) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
new file mode 100644
index 0000000..5632513
--- /dev/null
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
+import org.apache.zeppelin.dep.Dependency;
+import org.apache.zeppelin.dep.DependencyResolver;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterFactory;
+import org.apache.zeppelin.interpreter.InterpreterInfo;
+import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.apache.zeppelin.interpreter.DefaultInterpreterProperty;
+import org.apache.zeppelin.interpreter.InterpreterProperty;
+import org.apache.zeppelin.interpreter.InterpreterSettingManager;
+import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
+import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
+import org.apache.zeppelin.interpreter.mock.MockInterpreter11;
+import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class NoteInterpreterLoaderTest {
+
+  private File tmpDir;
+  private ZeppelinConfiguration conf;
+  private InterpreterFactory factory;
+  private InterpreterSettingManager interpreterSettingManager;
+  private DependencyResolver depResolver;
+
+  @Before
+  public void setUp() throws Exception {
+    tmpDir = new 
File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis());
+    tmpDir.mkdirs();
+    new File(tmpDir, "conf").mkdirs();
+
+    System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), 
tmpDir.getAbsolutePath());
+
+    conf = ZeppelinConfiguration.create();
+
+    depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + 
"/local-repo");
+    interpreterSettingManager = new InterpreterSettingManager(conf, 
depResolver, new InterpreterOption(true));
+    factory = new InterpreterFactory(conf, null, null, null, depResolver, 
false, interpreterSettingManager);
+
+    ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>();
+    interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), 
"mock1", true, Maps.<String, Object>newHashMap()));
+    interpreterInfos.add(new 
InterpreterInfo(MockInterpreter11.class.getName(), "mock11", false, 
Maps.<String, Object>newHashMap()));
+    ArrayList<InterpreterInfo> interpreterInfos2 = new ArrayList<>();
+    interpreterInfos2.add(new 
InterpreterInfo(MockInterpreter2.class.getName(), "mock2", true, Maps.<String, 
Object>newHashMap()));
+
+    interpreterSettingManager.add("group1", interpreterInfos, 
Lists.<Dependency>newArrayList(), new InterpreterOption(), Maps.<String, 
DefaultInterpreterProperty>newHashMap(), "mock", null);
+    interpreterSettingManager.add("group2", interpreterInfos2, 
Lists.<Dependency>newArrayList(), new InterpreterOption(), Maps.<String, 
DefaultInterpreterProperty>newHashMap(), "mock", null);
+
+    interpreterSettingManager.createNewSetting("group1", "group1", 
Lists.<Dependency>newArrayList(), new InterpreterOption(), new HashMap<String, 
InterpreterProperty>());
+    interpreterSettingManager.createNewSetting("group2", "group2", 
Lists.<Dependency>newArrayList(), new InterpreterOption(), new HashMap<String, 
InterpreterProperty>());
+
+
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    delete(tmpDir);
+    Interpreter.registeredInterpreters.clear();
+  }
+
+  @Test
+  public void testGetInterpreter() throws IOException {
+    interpreterSettingManager.setInterpreters("user", "note", 
interpreterSettingManager.getDefaultInterpreterSettingList());
+
+    // when there're no interpreter selection directive
+    assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", 
factory.getInterpreter("user", "note", null).getClassName());
+    assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", 
factory.getInterpreter("user", "note", "").getClassName());
+    assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", 
factory.getInterpreter("user", "note", " ").getClassName());
+
+    // when group name is omitted
+    assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter11", 
factory.getInterpreter("user", "note", "mock11").getClassName());
+
+    // when 'name' is ommitted
+    assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", 
factory.getInterpreter("user", "note", "group1").getClassName());
+    assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter2", 
factory.getInterpreter("user", "note", "group2").getClassName());
+
+    // when nothing is ommitted
+    assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", 
factory.getInterpreter("user", "note", "group1.mock1").getClassName());
+    assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter11", 
factory.getInterpreter("user", "note", "group1.mock11").getClassName());
+    assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter2", 
factory.getInterpreter("user", "note", "group2.mock2").getClassName());
+
+    interpreterSettingManager.closeNote("user", "note");
+  }
+
+  @Test
+  public void testNoteSession() throws IOException {
+    interpreterSettingManager.setInterpreters("user", "noteA", 
interpreterSettingManager.getDefaultInterpreterSettingList());
+    
interpreterSettingManager.getInterpreterSettings("noteA").get(0).getOption().setPerNote(InterpreterOption.SCOPED);
+
+    interpreterSettingManager.setInterpreters("user", "noteB", 
interpreterSettingManager.getDefaultInterpreterSettingList());
+    
interpreterSettingManager.getInterpreterSettings("noteB").get(0).getOption().setPerNote(InterpreterOption.SCOPED);
+
+    // interpreters are not created before accessing it
+    
assertNull(interpreterSettingManager.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user",
 "noteA").get("noteA"));
+    
assertNull(interpreterSettingManager.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user",
 "noteB").get("noteB"));
+
+    factory.getInterpreter("user", "noteA", null).open();
+    factory.getInterpreter("user", "noteB", null).open();
+
+    assertTrue(
+        factory.getInterpreter("user", "noteA", 
null).getInterpreterGroup().getId().equals(
+        factory.getInterpreter("user", "noteB", 
null).getInterpreterGroup().getId()));
+
+    // interpreters are created after accessing it
+    
assertNotNull(interpreterSettingManager.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user",
 "noteA").get("noteA"));
+    
assertNotNull(interpreterSettingManager.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user",
 "noteB").get("noteB"));
+
+    // invalid close
+    interpreterSettingManager.closeNote("user", "note");
+    
assertNotNull(interpreterSettingManager.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user",
 "shared_process").get("noteA"));
+    
assertNotNull(interpreterSettingManager.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user",
 "shared_process").get("noteB"));
+
+    // when
+    interpreterSettingManager.closeNote("user", "noteA");
+    interpreterSettingManager.closeNote("user", "noteB");
+
+    // interpreters are destroyed after close
+    
assertNull(interpreterSettingManager.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user",
 "shared_process").get("noteA"));
+    
assertNull(interpreterSettingManager.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user",
 "shared_process").get("noteB"));
+
+  }
+
+  @Test
+  public void testNotePerInterpreterProcess() throws IOException {
+    interpreterSettingManager.setInterpreters("user", "noteA", 
interpreterSettingManager.getDefaultInterpreterSettingList());
+    
interpreterSettingManager.getInterpreterSettings("noteA").get(0).getOption().setPerNote(InterpreterOption.ISOLATED);
+
+    interpreterSettingManager.setInterpreters("user", "noteB", 
interpreterSettingManager.getDefaultInterpreterSettingList());
+    
interpreterSettingManager.getInterpreterSettings("noteB").get(0).getOption().setPerNote(InterpreterOption.ISOLATED);
+
+    // interpreters are not created before accessing it
+    
assertNull(interpreterSettingManager.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user",
 "noteA").get("shared_session"));
+    
assertNull(interpreterSettingManager.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user",
 "noteB").get("shared_session"));
+
+    factory.getInterpreter("user", "noteA", null).open();
+    factory.getInterpreter("user", "noteB", null).open();
+
+    // per note interpreter process
+    assertFalse(
+        factory.getInterpreter("user", "noteA", 
null).getInterpreterGroup().getId().equals(
+        factory.getInterpreter("user", "noteB", 
null).getInterpreterGroup().getId()));
+
+    // interpreters are created after accessing it
+    
assertNotNull(interpreterSettingManager.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user",
 "noteA").get("shared_session"));
+    
assertNotNull(interpreterSettingManager.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user",
 "noteB").get("shared_session"));
+
+    // when
+    interpreterSettingManager.closeNote("user", "noteA");
+    interpreterSettingManager.closeNote("user", "noteB");
+
+    // interpreters are destroyed after close
+    
assertNull(interpreterSettingManager.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user",
 "noteA").get("shared_session"));
+    
assertNull(interpreterSettingManager.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user",
 "noteB").get("shared_session"));
+  }
+
+  @Test
+  public void testNoteInterpreterCloseForAll() throws IOException {
+    interpreterSettingManager.setInterpreters("user", "FitstNote", 
interpreterSettingManager.getDefaultInterpreterSettingList());
+    
interpreterSettingManager.getInterpreterSettings("FitstNote").get(0).getOption().setPerNote(InterpreterOption.SCOPED);
+
+    interpreterSettingManager.setInterpreters("user", "yourFirstNote", 
interpreterSettingManager.getDefaultInterpreterSettingList());
+    
interpreterSettingManager.getInterpreterSettings("yourFirstNote").get(0).getOption().setPerNote(InterpreterOption.ISOLATED);
+
+    // interpreters are not created before accessing it
+    
assertNull(interpreterSettingManager.getInterpreterSettings("FitstNote").get(0).getInterpreterGroup("user",
 "FitstNote").get("FitstNote"));
+    
assertNull(interpreterSettingManager.getInterpreterSettings("yourFirstNote").get(0).getInterpreterGroup("user",
 "yourFirstNote").get("yourFirstNote"));
+
+    Interpreter firstNoteIntp = factory.getInterpreter("user", "FitstNote", 
"group1.mock1");
+    Interpreter yourFirstNoteIntp = factory.getInterpreter("user", 
"yourFirstNote", "group1.mock1");
+
+    firstNoteIntp.open();
+    yourFirstNoteIntp.open();
+
+    assertTrue(((LazyOpenInterpreter)firstNoteIntp).isOpen());
+    assertTrue(((LazyOpenInterpreter)yourFirstNoteIntp).isOpen());
+
+    interpreterSettingManager.closeNote("user", "FitstNote");
+
+    assertFalse(((LazyOpenInterpreter)firstNoteIntp).isOpen());
+    assertTrue(((LazyOpenInterpreter)yourFirstNoteIntp).isOpen());
+
+    //reopen
+    firstNoteIntp.open();
+
+    assertTrue(((LazyOpenInterpreter)firstNoteIntp).isOpen());
+    assertTrue(((LazyOpenInterpreter)yourFirstNoteIntp).isOpen());
+
+    // invalid check
+    interpreterSettingManager.closeNote("invalid", "Note");
+
+    assertTrue(((LazyOpenInterpreter)firstNoteIntp).isOpen());
+    assertTrue(((LazyOpenInterpreter)yourFirstNoteIntp).isOpen());
+
+    // invalid contains value check
+    interpreterSettingManager.closeNote("u", "Note");
+
+    assertTrue(((LazyOpenInterpreter)firstNoteIntp).isOpen());
+    assertTrue(((LazyOpenInterpreter)yourFirstNoteIntp).isOpen());
+  }
+
+
+  private void delete(File file){
+    if(file.isFile()) file.delete();
+    else if(file.isDirectory()){
+      File [] files = file.listFiles();
+      if(files!=null && files.length>0){
+        for(File f : files){
+          delete(f);
+        }
+      }
+      file.delete();
+    }
+  }
+}

Reply via email to