[GitHub] [zeppelin] Leemoonsoo commented on a change in pull request #3240: [ZEPPELIN-3840] Zeppelin on Kubernetes

2019-05-03 Thread GitBox
Leemoonsoo commented on a change in pull request #3240: [ZEPPELIN-3840] 
Zeppelin on Kubernetes
URL: https://github.com/apache/zeppelin/pull/3240#discussion_r280916925
 
 

 ##
 File path: 
zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java
 ##
 @@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.launcher;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.gson.Gson;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.commons.exec.*;
+import org.apache.commons.io.IOUtils;
+
+import java.io.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Kubectl {
+  private final Logger LOGGER = LoggerFactory.getLogger(Kubectl.class);
+  private final String kubectlCmd;
+  private final Gson gson = new Gson();
+  private String namespace;
+
+  public Kubectl(String kubectlCmd) {
+this.kubectlCmd = kubectlCmd;
+  }
+
+  /**
+   * Override namespace. Otherwise use namespace provided in schema
+   * @param namespace
+   */
+  public void setNamespace(String namespace) {
+this.namespace = namespace;
+  }
+
+  public String getNamespace() {
+return namespace;
+  }
+
+  public String apply(String spec) throws IOException {
+return execAndGet(new String[]{"apply", "-f", "-"}, spec);
+  }
+
+  public String delete(String spec) throws IOException {
+return execAndGet(new String[]{"delete", "-f", "-"}, spec);
+  }
+
+  public String wait(String resource, String waitFor, int timeoutSec) throws 
IOException {
+try {
+  return execAndGet(new String[]{
+  "wait",
+  resource,
+  String.format("--for=%s", waitFor),
+  String.format("--timeout=%ds", timeoutSec)});
+} catch (IOException e) {
+  if ("delete".equals(waitFor) && e.getMessage().contains("NotFound")) {
+LOGGER.info("{} Not found. Maybe already deleted.", resource);
+return "";
+  } else {
+throw e;
+  }
+}
+  }
+
+  public ExecuteWatchdog portForward(String resource, String [] ports) throws 
IOException {
+DefaultExecutor executor = new DefaultExecutor();
+CommandLine cmd = new CommandLine(kubectlCmd);
+cmd.addArguments("port-forward");
+cmd.addArguments(resource);
+cmd.addArguments(ports);
+
+ExecuteWatchdog watchdog = new ExecuteWatchdog(-1);
+executor.setWatchdog(watchdog);
+
+executor.execute(cmd, new ExecuteResultHandler() {
+  @Override
+  public void onProcessComplete(int i) {
+LOGGER.info("Port-forward stopped");
+  }
+
+  @Override
+  public void onProcessFailed(ExecuteException e) {
+LOGGER.debug("port-forward process exit", e);
+  }
+});
+
+return watchdog;
+  }
+
+  String execAndGet(String [] args) throws IOException {
+return execAndGet(args, "");
+  }
+
+  @VisibleForTesting
+  String execAndGet(String [] args, String stdin) throws IOException {
+InputStream ins = IOUtils.toInputStream(stdin);
+ByteArrayOutputStream stdout = new ByteArrayOutputStream();
+ByteArrayOutputStream stderr = new ByteArrayOutputStream();
+ArrayList argsToOverride = new ArrayList<>(Arrays.asList(args));
+
+// set namespace
+if (namespace != null) {
+  argsToOverride.add("--namespace=" + namespace);
+}
+
+LOGGER.info("kubectl " + argsToOverride);
+LOGGER.debug(stdin);
+
+try {
+  int exitCode = execute(
+  argsToOverride.toArray(new String[0]),
+  ins,
+  stdout,
+  stderr
+  );
+
+  if (exitCode == 0) {
+String output = new String(stdout.toByteArray());
+return output;
+  } else {
+String output = new String(stderr.toByteArray());
+throw new IOException(String.format("non zero return code (%d). %s", 
exitCode, output));
+  }
+} catch (Exception e) {
+  String output = new String(stderr.toByteArray());
+  throw new IOException(output, e);
+}
+  }
+
+  public int execute(String [] args, InputStream stdin, OutputStream stdout, 
OutputStream stderr) throws IOException {
+

[GitHub] [zeppelin] Leemoonsoo commented on a change in pull request #3240: [ZEPPELIN-3840] Zeppelin on Kubernetes

2019-05-03 Thread GitBox
Leemoonsoo commented on a change in pull request #3240: [ZEPPELIN-3840] 
Zeppelin on Kubernetes
URL: https://github.com/apache/zeppelin/pull/3240#discussion_r280916321
 
 

 ##
 File path: 
zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
 ##
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.launcher;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Interpreter Launcher which use shell script to launch the interpreter 
process.
+ */
+public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(K8sStandardInterpreterLauncher.class);
+  private final Kubectl kubectl;
+  private InterpreterLaunchContext context;
+
+
+  public K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, 
RecoveryStorage recoveryStorage) throws IOException {
+super(zConf, recoveryStorage);
+kubectl = new Kubectl(zConf.getK8sKubectlCmd());
+kubectl.setNamespace(getNamespace());
+  }
+
+  @VisibleForTesting
+  K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage 
recoveryStorage, Kubectl kubectl) {
+super(zConf, recoveryStorage);
+this.kubectl = kubectl;
+  }
+
+
+  /**
+   * Check if i'm running inside of kubernetes or not.
+   * It should return truth regardless of ZeppelinConfiguration.getRunMode().
+   *
+   * Normally, unless Zeppelin is running on Kubernetes, 
K8sStandardInterpreterLauncher shouldn't even have initialized.
+   * However, when ZeppelinConfiguration.getRunMode() is force 'k8s', 
InterpreterSetting.getLauncherPlugin() will try
+   * to use K8sStandardInterpreterLauncher. This is useful for development. It 
allows Zeppelin server running on your
+   * IDE and creates your interpreters in Kubernetes. So any code changes on 
Zeppelin server or kubernetes yaml spec
+   * can be applied without re-building docker image.
+   * @return
+   */
+  boolean isRunningOnKubernetes() {
+if (new File("/var/run/secrets/kubernetes.io").exists()) {
+  return true;
+} else {
+  return false;
+}
+  }
+
+  /**
+   * Get current namespace
+   * @throws IOException
+   */
+  String getNamespace() throws IOException {
+if (isRunningOnKubernetes()) {
+  return 
readFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace", 
Charset.defaultCharset()).trim();
+} else {
+  return "default";
+}
+  }
+
+  /**
+   * Get hostname. It should be the same to Service name (and Pod name) of the 
Kubernetes
+   * @return
+   */
+  String getHostname() {
+try {
+  return InetAddress.getLocalHost().getHostName();
+} catch (UnknownHostException e) {
+  return "localhost";
+}
+  }
+
+  /**
+   * get Zeppelin server host dns.
+   * return ..svc.cluster.local
+   * @throws IOException
+   */
+  private String getZeppelinServiceHost() throws IOException {
+if (isRunningOnKubernetes()) {
+  return String.format("%s.%s.svc.cluster.local",
+  getHostname(), // service name and pod name should be the same
 
 Review comment:
   Good point. Notebook serving (https://github.com/apache/zeppelin/pull/3356) 
has an exact problem while TestTask runs Zeppelin using `Job`, and ServingTask 
runs Zeppelin using `Deployment`.
   
   In serving implementation, K8sStandardInterpreterLauncher has modified to 
looking for `SERVICE_NAME` env variable. Check 
https://github.com/apache/zeppelin/pull/3356/files#diff-dfad7c26921ec97aa7bca7fc3180408fL109.
 Does it make sense?


[GitHub] [zeppelin] Leemoonsoo commented on a change in pull request #3240: [ZEPPELIN-3840] Zeppelin on Kubernetes

2019-05-03 Thread GitBox
Leemoonsoo commented on a change in pull request #3240: [ZEPPELIN-3840] 
Zeppelin on Kubernetes
URL: https://github.com/apache/zeppelin/pull/3240#discussion_r280914489
 
 

 ##
 File path: 
zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
 ##
 @@ -0,0 +1,382 @@
+package org.apache.zeppelin.interpreter.launcher;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.exec.ExecuteWatchdog;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(K8sStandardInterpreterLauncher.class);
+  private static final int K8S_INTERPRETER_SERVICE_PORT = 12321;
+  private final Kubectl kubectl;
+  private final String interpreterGroupId;
+  private final String interpreterGroupName;
+  private final String interpreterSettingName;
+  private final File specTempaltes;
+  private final String containerImage;
+  private final Properties properties;
+  private final Map envs;
+  private final String zeppelinServiceHost;
+  private final String zeppelinServiceRpcPort;
+
+  private final Gson gson = new Gson();
+  private final String podName;
+  private final boolean portForward;
+  private final String sparkImage;
+  private ExecuteWatchdog portForwardWatchdog;
+  private int podPort = K8S_INTERPRETER_SERVICE_PORT;
+
+  private AtomicBoolean started = new AtomicBoolean(false);
+
+  public K8sRemoteInterpreterProcess(
+  Kubectl kubectl,
+  File specTemplates,
+  String containerImage,
+  String interpreterGroupId,
+  String interpreterGroupName,
+  String interpreterSettingName,
+  Properties properties,
+  Map envs,
+  String zeppelinServiceHost,
+  String zeppelinServiceRpcPort,
+  boolean portForward,
+  String sparkImage,
+  int connectTimeout
+  ) {
+super(connectTimeout);
+this.kubectl = kubectl;
+this.specTempaltes = specTemplates;
+this.containerImage = containerImage;
+this.interpreterGroupId = interpreterGroupId;
+this.interpreterGroupName = interpreterGroupName;
+this.interpreterSettingName = interpreterSettingName;
+this.properties = properties;
+this.envs = new HashMap(envs);
+this.zeppelinServiceHost = zeppelinServiceHost;
+this.zeppelinServiceRpcPort = zeppelinServiceRpcPort;
+this.portForward = portForward;
+this.sparkImage = sparkImage;
+this.podName = interpreterGroupName.toLowerCase() + "-" + 
getRandomString(6);
+  }
+
+
+  /**
+   * Get interpreter pod name
+   * @return
+   */
+  @VisibleForTesting
+  String getPodName() {
+return podName;
+  }
+
+  @Override
+  public String getInterpreterSettingName() {
+return interpreterSettingName;
+  }
+
+  @Override
+  public void start(String userName) throws IOException {
+// create new pod
+apply(specTempaltes, false);
+kubectl.wait(String.format("pod/%s", getPodName()), "condition=Ready", 
getConnectTimeout()/1000);
+
+if (portForward) {
+  podPort = 
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
+  portForwardWatchdog = kubectl.portForward(
+  String.format("pod/%s", getPodName()),
+  new String[] {
+  String.format("%s:%s", podPort, K8S_INTERPRETER_SERVICE_PORT)
+  });
+}
+
+long startTime = System.currentTimeMillis();
+
+// wait until interpreter send started message through thrift rpc
+synchronized (started) {
+  if (!started.get()) {
+try {
+  started.wait(getConnectTimeout());
+} catch (InterruptedException e) {
+  LOGGER.error("Remote interpreter is not accessible");
+}
+  }
+}
+
+if (!started.get()) {
+  LOGGER.info(
+  String.format("Interpreter pod creation is time out in %d seconds",
+  getConnectTimeout()/1000));
+}
+
+// waits for interpreter thrift rpc server ready
+while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
+  if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), 
getPort())) {
+break;
+  } else {
+try {
+  Thread.sleep(1000);
+} catch (InterruptedException e) {
+}
+  }
+}
+  }
+
+  @Override
+  public void stop() {
+// delete pod
+try {
+  apply(specTempaltes, true);
+} catch (IOException e) {