[GitHub] [zeppelin] Leemoonsoo commented on a change in pull request #3240: [ZEPPELIN-3840] Zeppelin on Kubernetes
Leemoonsoo commented on a change in pull request #3240: [ZEPPELIN-3840] Zeppelin on Kubernetes URL: https://github.com/apache/zeppelin/pull/3240#discussion_r280916925 ## File path: zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java ## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.launcher; + +import com.google.common.annotations.VisibleForTesting; +import com.google.gson.Gson; +import java.util.ArrayList; +import java.util.Arrays; + +import org.apache.commons.exec.*; +import org.apache.commons.io.IOUtils; + +import java.io.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Kubectl { + private final Logger LOGGER = LoggerFactory.getLogger(Kubectl.class); + private final String kubectlCmd; + private final Gson gson = new Gson(); + private String namespace; + + public Kubectl(String kubectlCmd) { +this.kubectlCmd = kubectlCmd; + } + + /** + * Override namespace. Otherwise use namespace provided in schema + * @param namespace + */ + public void setNamespace(String namespace) { +this.namespace = namespace; + } + + public String getNamespace() { +return namespace; + } + + public String apply(String spec) throws IOException { +return execAndGet(new String[]{"apply", "-f", "-"}, spec); + } + + public String delete(String spec) throws IOException { +return execAndGet(new String[]{"delete", "-f", "-"}, spec); + } + + public String wait(String resource, String waitFor, int timeoutSec) throws IOException { +try { + return execAndGet(new String[]{ + "wait", + resource, + String.format("--for=%s", waitFor), + String.format("--timeout=%ds", timeoutSec)}); +} catch (IOException e) { + if ("delete".equals(waitFor) && e.getMessage().contains("NotFound")) { +LOGGER.info("{} Not found. Maybe already deleted.", resource); +return ""; + } else { +throw e; + } +} + } + + public ExecuteWatchdog portForward(String resource, String [] ports) throws IOException { +DefaultExecutor executor = new DefaultExecutor(); +CommandLine cmd = new CommandLine(kubectlCmd); +cmd.addArguments("port-forward"); +cmd.addArguments(resource); +cmd.addArguments(ports); + +ExecuteWatchdog watchdog = new ExecuteWatchdog(-1); +executor.setWatchdog(watchdog); + +executor.execute(cmd, new ExecuteResultHandler() { + @Override + public void onProcessComplete(int i) { +LOGGER.info("Port-forward stopped"); + } + + @Override + public void onProcessFailed(ExecuteException e) { +LOGGER.debug("port-forward process exit", e); + } +}); + +return watchdog; + } + + String execAndGet(String [] args) throws IOException { +return execAndGet(args, ""); + } + + @VisibleForTesting + String execAndGet(String [] args, String stdin) throws IOException { +InputStream ins = IOUtils.toInputStream(stdin); +ByteArrayOutputStream stdout = new ByteArrayOutputStream(); +ByteArrayOutputStream stderr = new ByteArrayOutputStream(); +ArrayList argsToOverride = new ArrayList<>(Arrays.asList(args)); + +// set namespace +if (namespace != null) { + argsToOverride.add("--namespace=" + namespace); +} + +LOGGER.info("kubectl " + argsToOverride); +LOGGER.debug(stdin); + +try { + int exitCode = execute( + argsToOverride.toArray(new String[0]), + ins, + stdout, + stderr + ); + + if (exitCode == 0) { +String output = new String(stdout.toByteArray()); +return output; + } else { +String output = new String(stderr.toByteArray()); +throw new IOException(String.format("non zero return code (%d). %s", exitCode, output)); + } +} catch (Exception e) { + String output = new String(stderr.toByteArray()); + throw new IOException(output, e); +} + } + + public int execute(String [] args, InputStream stdin, OutputStream stdout, OutputStream stderr) throws IOException { +
[GitHub] [zeppelin] Leemoonsoo commented on a change in pull request #3240: [ZEPPELIN-3840] Zeppelin on Kubernetes
Leemoonsoo commented on a change in pull request #3240: [ZEPPELIN-3840] Zeppelin on Kubernetes URL: https://github.com/apache/zeppelin/pull/3240#discussion_r280916321 ## File path: zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.launcher; + +import java.io.File; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Paths; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Interpreter Launcher which use shell script to launch the interpreter process. + */ +public class K8sStandardInterpreterLauncher extends InterpreterLauncher { + + private static final Logger LOGGER = LoggerFactory.getLogger(K8sStandardInterpreterLauncher.class); + private final Kubectl kubectl; + private InterpreterLaunchContext context; + + + public K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) throws IOException { +super(zConf, recoveryStorage); +kubectl = new Kubectl(zConf.getK8sKubectlCmd()); +kubectl.setNamespace(getNamespace()); + } + + @VisibleForTesting + K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage, Kubectl kubectl) { +super(zConf, recoveryStorage); +this.kubectl = kubectl; + } + + + /** + * Check if i'm running inside of kubernetes or not. + * It should return truth regardless of ZeppelinConfiguration.getRunMode(). + * + * Normally, unless Zeppelin is running on Kubernetes, K8sStandardInterpreterLauncher shouldn't even have initialized. + * However, when ZeppelinConfiguration.getRunMode() is force 'k8s', InterpreterSetting.getLauncherPlugin() will try + * to use K8sStandardInterpreterLauncher. This is useful for development. It allows Zeppelin server running on your + * IDE and creates your interpreters in Kubernetes. So any code changes on Zeppelin server or kubernetes yaml spec + * can be applied without re-building docker image. + * @return + */ + boolean isRunningOnKubernetes() { +if (new File("/var/run/secrets/kubernetes.io").exists()) { + return true; +} else { + return false; +} + } + + /** + * Get current namespace + * @throws IOException + */ + String getNamespace() throws IOException { +if (isRunningOnKubernetes()) { + return readFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace", Charset.defaultCharset()).trim(); +} else { + return "default"; +} + } + + /** + * Get hostname. It should be the same to Service name (and Pod name) of the Kubernetes + * @return + */ + String getHostname() { +try { + return InetAddress.getLocalHost().getHostName(); +} catch (UnknownHostException e) { + return "localhost"; +} + } + + /** + * get Zeppelin server host dns. + * return ..svc.cluster.local + * @throws IOException + */ + private String getZeppelinServiceHost() throws IOException { +if (isRunningOnKubernetes()) { + return String.format("%s.%s.svc.cluster.local", + getHostname(), // service name and pod name should be the same Review comment: Good point. Notebook serving (https://github.com/apache/zeppelin/pull/3356) has an exact problem while TestTask runs Zeppelin using `Job`, and ServingTask runs Zeppelin using `Deployment`. In serving implementation, K8sStandardInterpreterLauncher has modified to looking for `SERVICE_NAME` env variable. Check https://github.com/apache/zeppelin/pull/3356/files#diff-dfad7c26921ec97aa7bca7fc3180408fL109. Does it make sense?
[GitHub] [zeppelin] Leemoonsoo commented on a change in pull request #3240: [ZEPPELIN-3840] Zeppelin on Kubernetes
Leemoonsoo commented on a change in pull request #3240: [ZEPPELIN-3840] Zeppelin on Kubernetes URL: https://github.com/apache/zeppelin/pull/3240#discussion_r280914489 ## File path: zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java ## @@ -0,0 +1,382 @@ +package org.apache.zeppelin.interpreter.launcher; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.exec.ExecuteWatchdog; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { + private static final Logger LOGGER = LoggerFactory.getLogger(K8sStandardInterpreterLauncher.class); + private static final int K8S_INTERPRETER_SERVICE_PORT = 12321; + private final Kubectl kubectl; + private final String interpreterGroupId; + private final String interpreterGroupName; + private final String interpreterSettingName; + private final File specTempaltes; + private final String containerImage; + private final Properties properties; + private final Map envs; + private final String zeppelinServiceHost; + private final String zeppelinServiceRpcPort; + + private final Gson gson = new Gson(); + private final String podName; + private final boolean portForward; + private final String sparkImage; + private ExecuteWatchdog portForwardWatchdog; + private int podPort = K8S_INTERPRETER_SERVICE_PORT; + + private AtomicBoolean started = new AtomicBoolean(false); + + public K8sRemoteInterpreterProcess( + Kubectl kubectl, + File specTemplates, + String containerImage, + String interpreterGroupId, + String interpreterGroupName, + String interpreterSettingName, + Properties properties, + Map envs, + String zeppelinServiceHost, + String zeppelinServiceRpcPort, + boolean portForward, + String sparkImage, + int connectTimeout + ) { +super(connectTimeout); +this.kubectl = kubectl; +this.specTempaltes = specTemplates; +this.containerImage = containerImage; +this.interpreterGroupId = interpreterGroupId; +this.interpreterGroupName = interpreterGroupName; +this.interpreterSettingName = interpreterSettingName; +this.properties = properties; +this.envs = new HashMap(envs); +this.zeppelinServiceHost = zeppelinServiceHost; +this.zeppelinServiceRpcPort = zeppelinServiceRpcPort; +this.portForward = portForward; +this.sparkImage = sparkImage; +this.podName = interpreterGroupName.toLowerCase() + "-" + getRandomString(6); + } + + + /** + * Get interpreter pod name + * @return + */ + @VisibleForTesting + String getPodName() { +return podName; + } + + @Override + public String getInterpreterSettingName() { +return interpreterSettingName; + } + + @Override + public void start(String userName) throws IOException { +// create new pod +apply(specTempaltes, false); +kubectl.wait(String.format("pod/%s", getPodName()), "condition=Ready", getConnectTimeout()/1000); + +if (portForward) { + podPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); + portForwardWatchdog = kubectl.portForward( + String.format("pod/%s", getPodName()), + new String[] { + String.format("%s:%s", podPort, K8S_INTERPRETER_SERVICE_PORT) + }); +} + +long startTime = System.currentTimeMillis(); + +// wait until interpreter send started message through thrift rpc +synchronized (started) { + if (!started.get()) { +try { + started.wait(getConnectTimeout()); +} catch (InterruptedException e) { + LOGGER.error("Remote interpreter is not accessible"); +} + } +} + +if (!started.get()) { + LOGGER.info( + String.format("Interpreter pod creation is time out in %d seconds", + getConnectTimeout()/1000)); +} + +// waits for interpreter thrift rpc server ready +while (System.currentTimeMillis() - startTime < getConnectTimeout()) { + if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort())) { +break; + } else { +try { + Thread.sleep(1000); +} catch (InterruptedException e) { +} + } +} + } + + @Override + public void stop() { +// delete pod +try { + apply(specTempaltes, true); +} catch (IOException e) {