Repository: zeppelin Updated Branches: refs/heads/master 0758b7078 -> e25266706
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e2526670/scripts/mahout/add_mahout_interpreters.py ---------------------------------------------------------------------- diff --git a/scripts/mahout/add_mahout_interpreters.py b/scripts/mahout/add_mahout_interpreters.py new file mode 100644 index 0000000..307364d --- /dev/null +++ b/scripts/mahout/add_mahout_interpreters.py @@ -0,0 +1,290 @@ +# /** +# * Licensed to the Apache Software Foundation (ASF) under one +# * or more contributor license agreements. See the NOTICE file +# * distributed with this work for additional information +# * regarding copyright ownership. The ASF licenses this file +# * to you under the Apache License, Version 2.0 (the +# * "License"); you may not use this file except in compliance +# * with the License. You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# */ + +import argparse +import json + +from os.path import isfile +from os import getcwd + +from subprocess import call, check_call + + +####################################################################################################################### +# I put these here so it will (hopeully) be easy(er) to bump versions / maintain +# If there is demand, we could easily make parts or all comand line arguments as well +####################################################################################################################### +tar_name = "apache-mahout-distribution-0.12.2.tar.gz" +mahout_bin_url = "http://apache.osuosl.org/mahout/0.12.2/%s" % tar_name +mahout_version = "0.12.2" + +parser = argparse.ArgumentParser() + +parser.add_argument("--force_download", help="force download Apache Mahout", action="store_true") +parser.add_argument("--restart_later", help="force download Apache Mahout", action="store_true") +parser.add_argument("--zeppelin_home", help="path to ZEPPELIN_HOME") +parser.add_argument("--mahout_home", help="path to MAHOUT_HOME, use this if you have already installed Apache Mahout") +parser.add_argument("--overwrite_existing", help="if %sparkMahout or %flinkMahout exist, delete them and create new ones. Otherwise Fail.", action="store_true") + +args = parser.parse_args() + + + +class ZeppelinTerpWrangler: + def __init__(self, interpreter_json_path): + self.interpreter_json_path = interpreter_json_path + + def _getTerpID(self, terpName): + terp_id = None + for k, v in self.interpreter_json['interpreterSettings'].iteritems(): + if v['name'] == terpName: + terp_id = k + break + + return terp_id + + def _terpExists(self, terpName): + terp_id = self._getTerpID(terpName) + if terp_id == None: + return False + return True + + def createTerp(self, original_terp_name, new_terp_name, overwrite_existing=True ): + + new_terp_id = new_terp_name + if self._terpExists(new_terp_name): + print "Found existing '%s' interpreter..." % new_terp_name + if overwrite_existing: + print "deleting %s from interpreter.json" %new_terp_name + del self.interpreter_json['interpreterSettings'][self._getTerpID(new_terp_name)] + else: + print "exiting program." + exit(1) + + orig_terp_id = self._getTerpID(original_terp_name) + + from copy import deepcopy + self.interpreter_json['interpreterSettings'][new_terp_id] = deepcopy( + self.interpreter_json['interpreterSettings'][orig_terp_id]) + self.interpreter_json['interpreterSettings'][new_terp_id]['name'] = new_terp_name + self.interpreter_json['interpreterSettings'][new_terp_id]['id'] = new_terp_id + print "created new interpreter '%s' from interpreter '%s" % (new_terp_name, original_terp_name) + + def _readTerpJson(self): + with open(self.interpreter_json_path) as f: + self.interpreter_json = json.load(f) + + def _writeTerpJson(self): + with open(self.interpreter_json_path, 'wb') as f: + json.dump(self.interpreter_json, f, sort_keys=True, indent=4) + + def _updateTerpProp(self, terpName, property, value): + terp_id = self._getTerpID(terpName) + self.interpreter_json['interpreterSettings'][terp_id]['properties'][property] = value + + def _addTerpDep(self, terpName="", dep="", exclusions=None): + if self.interpreter_json == {}: + print "no interpreter.json loaded, reading last one downloaded" + self._readTerpJson() + terp_id = self._getTerpID(terpName) + deps = self.interpreter_json['interpreterSettings'][terp_id]['dependencies'] + + dep_dict = { + u'groupArtifactVersion': dep, + u'local': False + + } + if exclusions != None: + dep_dict["exclusions"] = exclusions + deps.append(dep_dict) + + ## Remove Duplicate Dependencies + seen = set() + new_deps = list() + for d in deps: + t = d.items() + if t[0] not in seen: + seen.add(t[0]) + new_deps.append(d) + + self.interpreter_json['interpreterSettings'][terp_id]['dependencies'] = new_deps + + def addMahoutConfig(self, terpName, mahout_home, mahout_version = "0.12.2"): + + print "updating '%s' with Apache Mahout dependencies and settings" % terpName + + terpDeps = ["%s/mahout-math-%s.jar" % (mahout_home, mahout_version), + "%s/mahout-math-scala_2.10-%s.jar" % (mahout_home, mahout_version)] + + if "spark" in terpName.lower(): + configs = { + "spark.kryo.referenceTracking": "false", + "spark.kryo.registrator": "org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator", + "spark.kryoserializer.buffer": "32k", + "spark.kryoserializer.buffer.max": "600m", + "spark.serializer": "org.apache.spark.serializer.KryoSerializer" + } + terpDeps.append('%s/mahout-spark_2.10-%s-dependency-reduced.jar' % (mahout_home, mahout_version)) + terpDeps.append("%s/mahout-spark_2.10-%s.jar" % (mahout_home, mahout_version)) + terpDeps.append("%s/mahout-spark-shell_2.10-%s.jar" % (mahout_home, mahout_version)) + + if "flink" in terpName.lower(): + configs = { + "taskmanager.numberOfTaskSlots" : "12" + } + addlDeps = [ + "%s/mahout-flink_2.10-%s.jar" % (mahout_home, mahout_version), + "%s/mahout-hdfs-%s.jar" % (mahout_home, mahout_version), + "com.google.guava:guava:14.0.1" + #"%s/guava-14.0.1.jar" % mahout_home ## reuired in lib dir if running against cluster + ] + for t in addlDeps: + terpDeps.append(t) + + for k, v in configs.iteritems(): + self._updateTerpProp(terpName, k, v) + + for t in terpDeps: + self._addTerpDep(terpName, t) + +####################################################################################################################### +# Need to be sure we know where Zeppelin Top directory is so we can edit conf files +# +####################################################################################################################### + +def valid_zeppelin_home(path): + return isfile(path + "/bin/zeppelin-daemon.sh") + +if args.zeppelin_home == None: + zeppelin_home = getcwd() + if (zeppelin_home.split("/")[-1] == "bin") and (isfile("zeppelin-daemon.sh")): + print "we're in the zeppelin/bin" + zeppelin_home = "/".join(zeppelin_home.split("/")[:-1]) + print "--zeppelin_home not specified, using %s" % zeppelin_home +else: + zeppelin_home = args.zeppelin_home + + +if not valid_zeppelin_home(zeppelin_home): + print "%s does not appear to be a valid ZEPPELIN_HOME - e.g. the top level directory of the ZEPPELIN install" % zeppelin_home + exit(1) +else: + print "ZEPPELIN_HOME validated" + +interpreter_json_path = zeppelin_home + "/conf/interpreter.json" + +if not isfile(interpreter_json_path): + print "interpreter.json doesn't exist. Checking weather Zeppelin is running." + status = call(["bin/zeppelin-daemon.sh", 'status'], cwd=zeppelin_home) + if status == 1: + print "Zeppelin doesn't appear to be running- it is possible that Zeppelin has never been run (interpreter.json is created when Zeppelin is run)" + print "I'm going to try to start Zeppelin to create interpreter.json" + call(["bin/zeppelin-daemon.sh", 'start'], cwd=zeppelin_home) + from time import sleep + sleep(3) + else: + print "We're in the correct top-level directory, Zeppelin appears to be running, but there is no 'interpreter.json'. \ + \nThis is a confusing case. Please try restarting Zeppelin, but if that doesn't work reach out on the mailing list." + +if isfile(interpreter_json_path): + z = ZeppelinTerpWrangler(interpreter_json_path) +else: + print "'interpreter.json' not found in %s/conf" % args.zeppelin_home + exit(1) + +####################################################################################################################### +# If --mahout_home not set, download and untar Mahout in to ZEPPELIN_HOME +# Set MAHOUT_HOME to ZEPPELIN_HOME/<mahout_untar_dir> +####################################################################################################################### + +def download_mahout(): + if args.force_download: + print "--force_download: OK, deleting existing tar if it exists." + call(["rm", "%s/%s" % (zeppelin_home, tar_name)]) + return True + elif isfile("%s/%s" % (zeppelin_home, tar_name)): + print "%s found, skipping download" % tar_name + return False + elif args.mahout_home: + print "--mahout_home set, skipping download" + return False + else: + return True + +if download_mahout(): + check_call(['wget', mahout_bin_url], cwd= zeppelin_home) + check_call(['tar', 'xzf', tar_name], cwd= zeppelin_home) + + + +if args.mahout_home: + mahout_home = args.mahout_home +else: + mahout_home = zeppelin_home + "/" + ".".join(tar_name.split(".")[:-2]) + +####################################################################################################################### +# Create new interpreters +####################################################################################################################### + +z._readTerpJson() +z.createTerp("spark", "sparkMahout", args.overwrite_existing) +z.createTerp("flink", "flinkMahout", args.overwrite_existing) +z.addMahoutConfig("sparkMahout", mahout_home, mahout_version) +z.addMahoutConfig("flinkMahout", mahout_home, mahout_version) +z._writeTerpJson() + +####################################################################################################################### +# Add "export MAHOUT_HOME=... to conf/zeppelin-env.sh +# Create if doesn't exist. +####################################################################################################################### + +mahout_home_str = '\nexport MAHOUT_HOME=%s\n' % (mahout_home) + +zeppelin_env_sh_path = '%s/conf/zeppelin-env.sh' % zeppelin_home +if isfile(zeppelin_env_sh_path): + with open(zeppelin_env_sh_path, 'rb') as f: + zeppelin_env_sh = f.readlines() + if any(["export MAHOUT_HOME=" in line for line in zeppelin_env_sh]): + print "'export MAHOUT_HOME=...' already exists in zeppelin_env.sh, not appending" + else: + print "appending '%s' to conf/zeppelin-env.sh" % mahout_home_str + with open(zeppelin_env_sh_path, 'a') as f: + f.write(mahout_home_str) +else: + print "appending '%s' to conf/zeppelin-env.sh" % mahout_home_str + with open(zeppelin_env_sh_path, 'wb') as f: + f.write(mahout_home_str) + + +####################################################################################################################### +# You have to restart Apache Zeppelin for new terps to show up... do this for user unless the specified otherwise +# +####################################################################################################################### +if not args.restart_later: + print "restarting Apache Zeppelin to load new interpreters..." + check_call(["bin/zeppelin-daemon.sh", 'restart'], cwd= zeppelin_home) +else: + print "--restart_later flag detected: remember to restart Zeppelin to see new Mahout interpreters!!" + +####################################################################################################################### +# Good bye +####################################################################################################################### + +print "---------------------------------------------------------------------------------------------------------------" +print "all done! Thanks for using Apache Mahout" +print "bye"