Updated Branches: refs/heads/master 1a3b6bb8b -> 3c771fb8a
Refactored mesos-tail to use helpers from cli.py. From: Shingo Omura <everpe...@gmail.com> Review: https://reviews.apache.org/r/15336 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3c771fb8 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3c771fb8 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3c771fb8 Branch: refs/heads/master Commit: 3c771fb8a9b50ce243b9c6c620713c37c1713186 Parents: 1a3b6bb Author: Benjamin Mahler <bmah...@twitter.com> Authored: Mon Nov 11 13:57:17 2013 -0800 Committer: Benjamin Mahler <bmah...@twitter.com> Committed: Mon Nov 11 13:57:17 2013 -0800 ---------------------------------------------------------------------- src/cli/mesos-tail | 253 ++++++++++++++++++------------------------------ 1 file changed, 95 insertions(+), 158 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/3c771fb8/src/cli/mesos-tail ---------------------------------------------------------------------- diff --git a/src/cli/mesos-tail b/src/cli/mesos-tail index f4f7c75..33acee4 100755 --- a/src/cli/mesos-tail +++ b/src/cli/mesos-tail @@ -1,103 +1,54 @@ #!/usr/bin/env python -import datetime import json import os -import resource import signal -import subprocess import sys import time -import urllib +import itertools +from contextlib import closing from optparse import OptionParser +from urllib2 import HTTPError, urlopen + +from mesos import http +from mesos.cli import * +from mesos.futures import * if sys.version_info < (2,6,0): - sys.stderr.write('Expecting Python >= 2.6\n') - sys.exit(1) - - -# Helper that uses 'mesos-resolve' to resolve the master's IP:port. -def resolve(master): - process = subprocess.Popen( - ['mesos-resolve', master], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=False) - - status = process.wait() - if status != 0: - print "Failed to resolve 'mesos-resolve %s'\n" % master - print process.stderr.read() + sys.stderr.write('Expecting Python >= 2.6\n') sys.exit(1) - return process.stdout.read() - - -class Slave: - def __init__(self, slave): - self.slave = slave - - def hostname(self): - return self.slave['hostname'] - - def curl(self, path, query): - pid = self.slave['pid'] - url = 'http://' + pid[len('slave(1)@'):] + path - if query is not None and len(query) > 0: - url += '?' + '&'.join( - ['%s=%s' % (urllib.quote(str(key)), urllib.quote(str(value))) for (key, value) in query.items()]) - - process = subprocess.Popen( - ['curl', '-sSfL', url], - stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=False) - - status = process.wait() - if status != 0: - print 'Failed to execute \'curl\':\n' - print process.stderr.read() - sys.exit(1) - - result = process.stdout.read() - process.stdout.close() - process.stderr.close() - return result - - def tail(self, task, file): +def read_forever(slave, task, file): framework_id = task['framework_id'] executor_id = task['executor_id'] + + # An executorless task has an empty executor ID in the master but + # uses the same executor ID as task ID in the slave. if executor_id == "": executor_id = task['id'] # Get 'state.json' to get the executor directory. - state = json.loads(self.curl('/slave(1)/state.json', None)) + try: + state = http.get(slave['pid'], '/slave(1)/state.json', None) + except: + sys.stderr.write('Failed to get state from slave\n') + sys.exit(1) directory = None - for framework in state['frameworks']: - if framework['id'] == framework_id: - for executor in framework['executors']: - if executor['id'] == executor_id: - directory = executor['directory'] - break - for completed_executor in framework['completed_executors']: - if completed_executor['id'] == executor_id: - directory = completed_executor['directory'] - break - - for completed_framework in state['completed_frameworks']: - if completed_framework['id'] == framework_id: - for completed_executor in completed_framework['completed_executors']: - if completed_executor['id'] == executor_id: - directory = completed_executor['directory'] - break + for framework in itertools.chain(state['frameworks'], + state['completed_frameworks']): + if framework['id'] == framework_id: + for executor in itertools.chain(framework['executors'], + framework['completed_executors']): + if executor['id'] == executor_id: + directory = executor['directory'] + break if directory is None: - raise IOError('Task directory/file not found') + sys.stderr.write('Task directory not found\n') + sys.exit(1) path = os.path.join(directory, file) @@ -106,91 +57,77 @@ class Slave: offset = 0 while True: - result = json.loads(self.curl( - '/files/read.json', - {'path': path, - 'offset': offset, - 'length': PAGE_LENGTH})) - if len(result['data']) == 0: - time.sleep(0.5) - continue - offset += len(result['data']) - yield result['data'] + try: + result = http.get(slave['pid'], + '/files/read.json', + {'path': path, + 'offset': offset, + 'length': PAGE_LENGTH}) + except HTTPError as error: + if error.code == 404: + sys.stderr.write('No such file or directory\n') + else: + sys.stderr.write('Failed to read file from slave\n') + sys.exit(1) + if len(result['data']) == 0: + time.sleep(0.5) + continue + offset += len(result['data']) + yield result['data'] def main(): - # Parse options for this script. - parser = OptionParser() - parser.add_option('--master') - parser.add_option('--framework') - parser.add_option('--task') - parser.add_option('--file') - (options, args) = parser.parse_args(sys.argv) - - if options.master is None: - print "Missing --master\n" - parser.print_help() - exit(-1) - - if options.framework is None: - print "Missing --framework\n" - parser.print_help() - exit(-1) - - if options.task is None: - print "Missing --task\n" - parser.print_help() - exit(-1) - - if options.file is None: - print "Missing --file\n" - parser.print_help() - exit(-1) - - url = 'http://' + resolve(options.master) + '/master/state.json' - file = urllib.urlopen(url) - state = json.loads(file.read()) - file.close() - - # Build a dict from slave ID to `slaves'. - slaves = {} - for slave in state['slaves']: - slaves[slave['id']] = Slave(slave) - - target_task = None - target_slave = None - - for framework in state['frameworks']: - if framework['id'] == options.framework: - for task in framework['tasks']: - if (task['id'] == options.task): - target_task = task - target_slave = slaves[task['slave_id']] - break - for completed_task in framework['completed_tasks']: - if (completed_task['id'] == options.task): - target_task = completed_task - target_slave = slaves[completed_task['slave_id']] - break - - for completed_framework in state['completed_frameworks']: - if completed_framework['id'] == options.framework: - for completed_task in completed_framework['completed_tasks']: - if (completed_task['id'] == options.task): - target_task = completed_task - target_slave= slaves[completed_task['slave_id']] - break - - for data in target_slave.tail(target_task, options.file): - sys.stdout.write(data) - sys.stdout.flush() - - sys.stderr.write('No task found!\n') - sys.stderr.flush() - exit(-1) - - -if __name__ == "__main__": + # Parse options for this script. + parser = OptionParser() + parser.add_option('--master') + parser.add_option('--framework') + parser.add_option('--task') + parser.add_option('--file') + (options, args) = parser.parse_args(sys.argv) + + if options.master is None: + usage('Missing --master', parser) + + if options.framework is None: + usage('Missing --framework', parser) + + if options.task is None: + usage('Missing --task', parser) + + if options.file is None: + usage('Missing --file', parser) + + # Get the master's state. + try: + master_state = http.get(resolve(options.master), '/master/state.json') + except: + sys.stderr.write('Failed to get the master state\n') + sys.exit(1) + + # Build a dict from slave ID to `slaves'. + slaves = {} + for slave in master_state['slaves']: + slaves[slave['id']] = slave + + def tail(slave, task, file): + for data in read_forever(slave, task, options.file): + sys.stdout.write(data) + sys.stdout.flush() + + for framework in itertools.chain(master_state['frameworks'], \ + master_state['completed_frameworks']): + if framework['id'] == options.framework: + for task in itertools.chain(framework['tasks'], framework['completed_tasks']): + if (task['id'] == options.task): + tail(slaves[task['slave_id']], task, options.file) + sys.exit(0) + + sys.stderr.write('No task or framework found!\n') + sys.stderr.flush() + sys.exit(-1) + + +if __name__ == '__main__': def signal_handler(signal, frame): sys.stdout.write('\n') sys.exit(130)