Repository: storm Updated Branches: refs/heads/master 1ea378d78 -> bb8d48da2
http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/py/storm/ttypes.py ---------------------------------------------------------------------- diff --git a/storm-core/src/py/storm/ttypes.py b/storm-core/src/py/storm/ttypes.py index 1bbaf37..e15cf1d 100644 --- a/storm-core/src/py/storm/ttypes.py +++ b/storm-core/src/py/storm/ttypes.py @@ -44,6 +44,26 @@ class TopologyInitialStatus: "INACTIVE": 2, } +class TopologyStatus: + ACTIVE = 1 + INACTIVE = 2 + REBALANCING = 3 + KILLED = 4 + + _VALUES_TO_NAMES = { + 1: "ACTIVE", + 2: "INACTIVE", + 3: "REBALANCING", + 4: "KILLED", + } + + _NAMES_TO_VALUES = { + "ACTIVE": 1, + "INACTIVE": 2, + "REBALANCING": 3, + "KILLED": 4, + } + class JavaObjectArg: """ @@ -3034,6 +3054,7 @@ class ExecutorStats: - emitted - transferred - specific + - rate """ thrift_spec = ( @@ -3041,15 +3062,17 @@ class ExecutorStats: (1, TType.MAP, 'emitted', (TType.STRING,None,TType.MAP,(TType.STRING,None,TType.I64,None)), None, ), # 1 (2, TType.MAP, 'transferred', (TType.STRING,None,TType.MAP,(TType.STRING,None,TType.I64,None)), None, ), # 2 (3, TType.STRUCT, 'specific', (ExecutorSpecificStats, ExecutorSpecificStats.thrift_spec), None, ), # 3 + (4, TType.DOUBLE, 'rate', None, None, ), # 4 ) def __hash__(self): - return 0 + hash(self.emitted) + hash(self.transferred) + hash(self.specific) + return 0 + hash(self.emitted) + hash(self.transferred) + hash(self.specific) + hash(self.rate) - def __init__(self, emitted=None, transferred=None, specific=None,): + def __init__(self, emitted=None, transferred=None, specific=None, rate=None,): self.emitted = emitted self.transferred = transferred self.specific = specific + self.rate = rate def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -3100,6 +3123,11 @@ class ExecutorStats: self.specific.read(iprot) else: iprot.skip(ftype) + elif fid == 4: + if ftype == TType.DOUBLE: + self.rate = iprot.readDouble(); + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -3138,6 +3166,10 @@ class ExecutorStats: oprot.writeFieldBegin('specific', TType.STRUCT, 3) self.specific.write(oprot) oprot.writeFieldEnd() + if self.rate is not None: + oprot.writeFieldBegin('rate', TType.DOUBLE, 4) + oprot.writeDouble(self.rate) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -3148,6 +3180,8 @@ class ExecutorStats: raise TProtocol.TProtocolException(message='Required field transferred is unset!') if self.specific is None: raise TProtocol.TProtocolException(message='Required field specific is unset!') + if self.rate is None: + raise TProtocol.TProtocolException(message='Required field rate is unset!') return @@ -4383,6 +4417,764 @@ class SubmitOptions: def __ne__(self, other): return not (self == other) +class SupervisorInfo: + """ + Attributes: + - time_secs + - hostname + - assignment_id + - used_ports + - meta + - scheduler_meta + - uptime_secs + """ + + thrift_spec = ( + None, # 0 + (1, TType.I64, 'time_secs', None, None, ), # 1 + (2, TType.STRING, 'hostname', None, None, ), # 2 + (3, TType.STRING, 'assignment_id', None, None, ), # 3 + (4, TType.LIST, 'used_ports', (TType.I64,None), None, ), # 4 + (5, TType.LIST, 'meta', (TType.I64,None), None, ), # 5 + (6, TType.MAP, 'scheduler_meta', (TType.STRING,None,TType.STRING,None), None, ), # 6 + (7, TType.I64, 'uptime_secs', None, None, ), # 7 + ) + + def __hash__(self): + return 0 + hash(self.time_secs) + hash(self.hostname) + hash(self.assignment_id) + hash(self.used_ports) + hash(self.meta) + hash(self.scheduler_meta) + hash(self.uptime_secs) + + def __init__(self, time_secs=None, hostname=None, assignment_id=None, used_ports=None, meta=None, scheduler_meta=None, uptime_secs=None,): + self.time_secs = time_secs + self.hostname = hostname + self.assignment_id = assignment_id + self.used_ports = used_ports + self.meta = meta + self.scheduler_meta = scheduler_meta + self.uptime_secs = uptime_secs + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.I64: + self.time_secs = iprot.readI64(); + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRING: + self.hostname = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.STRING: + self.assignment_id = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.LIST: + self.used_ports = [] + (_etype304, _size301) = iprot.readListBegin() + for _i305 in xrange(_size301): + _elem306 = iprot.readI64(); + self.used_ports.append(_elem306) + iprot.readListEnd() + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.LIST: + self.meta = [] + (_etype310, _size307) = iprot.readListBegin() + for _i311 in xrange(_size307): + _elem312 = iprot.readI64(); + self.meta.append(_elem312) + iprot.readListEnd() + else: + iprot.skip(ftype) + elif fid == 6: + if ftype == TType.MAP: + self.scheduler_meta = {} + (_ktype314, _vtype315, _size313 ) = iprot.readMapBegin() + for _i317 in xrange(_size313): + _key318 = iprot.readString().decode('utf-8') + _val319 = iprot.readString().decode('utf-8') + self.scheduler_meta[_key318] = _val319 + iprot.readMapEnd() + else: + iprot.skip(ftype) + elif fid == 7: + if ftype == TType.I64: + self.uptime_secs = iprot.readI64(); + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('SupervisorInfo') + if self.time_secs is not None: + oprot.writeFieldBegin('time_secs', TType.I64, 1) + oprot.writeI64(self.time_secs) + oprot.writeFieldEnd() + if self.hostname is not None: + oprot.writeFieldBegin('hostname', TType.STRING, 2) + oprot.writeString(self.hostname.encode('utf-8')) + oprot.writeFieldEnd() + if self.assignment_id is not None: + oprot.writeFieldBegin('assignment_id', TType.STRING, 3) + oprot.writeString(self.assignment_id.encode('utf-8')) + oprot.writeFieldEnd() + if self.used_ports is not None: + oprot.writeFieldBegin('used_ports', TType.LIST, 4) + oprot.writeListBegin(TType.I64, len(self.used_ports)) + for iter320 in self.used_ports: + oprot.writeI64(iter320) + oprot.writeListEnd() + oprot.writeFieldEnd() + if self.meta is not None: + oprot.writeFieldBegin('meta', TType.LIST, 5) + oprot.writeListBegin(TType.I64, len(self.meta)) + for iter321 in self.meta: + oprot.writeI64(iter321) + oprot.writeListEnd() + oprot.writeFieldEnd() + if self.scheduler_meta is not None: + oprot.writeFieldBegin('scheduler_meta', TType.MAP, 6) + oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.scheduler_meta)) + for kiter322,viter323 in self.scheduler_meta.items(): + oprot.writeString(kiter322.encode('utf-8')) + oprot.writeString(viter323.encode('utf-8')) + oprot.writeMapEnd() + oprot.writeFieldEnd() + if self.uptime_secs is not None: + oprot.writeFieldBegin('uptime_secs', TType.I64, 7) + oprot.writeI64(self.uptime_secs) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.time_secs is None: + raise TProtocol.TProtocolException(message='Required field time_secs is unset!') + if self.hostname is None: + raise TProtocol.TProtocolException(message='Required field hostname is unset!') + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class NodeInfo: + """ + Attributes: + - node + - port + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'node', None, None, ), # 1 + (2, TType.SET, 'port', (TType.I64,None), None, ), # 2 + ) + + def __hash__(self): + return 0 + hash(self.node) + hash(self.port) + + def __init__(self, node=None, port=None,): + self.node = node + self.port = port + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.node = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.SET: + self.port = set() + (_etype327, _size324) = iprot.readSetBegin() + for _i328 in xrange(_size324): + _elem329 = iprot.readI64(); + self.port.add(_elem329) + iprot.readSetEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('NodeInfo') + if self.node is not None: + oprot.writeFieldBegin('node', TType.STRING, 1) + oprot.writeString(self.node.encode('utf-8')) + oprot.writeFieldEnd() + if self.port is not None: + oprot.writeFieldBegin('port', TType.SET, 2) + oprot.writeSetBegin(TType.I64, len(self.port)) + for iter330 in self.port: + oprot.writeI64(iter330) + oprot.writeSetEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.node is None: + raise TProtocol.TProtocolException(message='Required field node is unset!') + if self.port is None: + raise TProtocol.TProtocolException(message='Required field port is unset!') + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class Assignment: + """ + Attributes: + - master_code_dir + - node_host + - executor_node_port + - executor_start_time_secs + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'master_code_dir', None, None, ), # 1 + (2, TType.MAP, 'node_host', (TType.STRING,None,TType.STRING,None), { + }, ), # 2 + (3, TType.MAP, 'executor_node_port', (TType.LIST,(TType.I64,None),TType.STRUCT,(NodeInfo, NodeInfo.thrift_spec)), { + }, ), # 3 + (4, TType.MAP, 'executor_start_time_secs', (TType.LIST,(TType.I64,None),TType.I64,None), { + }, ), # 4 + ) + + def __hash__(self): + return 0 + hash(self.master_code_dir) + hash(self.node_host) + hash(self.executor_node_port) + hash(self.executor_start_time_secs) + + def __init__(self, master_code_dir=None, node_host=thrift_spec[2][4], executor_node_port=thrift_spec[3][4], executor_start_time_secs=thrift_spec[4][4],): + self.master_code_dir = master_code_dir + if node_host is self.thrift_spec[2][4]: + node_host = { + } + self.node_host = node_host + if executor_node_port is self.thrift_spec[3][4]: + executor_node_port = { + } + self.executor_node_port = executor_node_port + if executor_start_time_secs is self.thrift_spec[4][4]: + executor_start_time_secs = { + } + self.executor_start_time_secs = executor_start_time_secs + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.master_code_dir = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.MAP: + self.node_host = {} + (_ktype332, _vtype333, _size331 ) = iprot.readMapBegin() + for _i335 in xrange(_size331): + _key336 = iprot.readString().decode('utf-8') + _val337 = iprot.readString().decode('utf-8') + self.node_host[_key336] = _val337 + iprot.readMapEnd() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.MAP: + self.executor_node_port = {} + (_ktype339, _vtype340, _size338 ) = iprot.readMapBegin() + for _i342 in xrange(_size338): + _key343 = [] + (_etype348, _size345) = iprot.readListBegin() + for _i349 in xrange(_size345): + _elem350 = iprot.readI64(); + _key343.append(_elem350) + iprot.readListEnd() + _val344 = NodeInfo() + _val344.read(iprot) + self.executor_node_port[_key343] = _val344 + iprot.readMapEnd() + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.MAP: + self.executor_start_time_secs = {} + (_ktype352, _vtype353, _size351 ) = iprot.readMapBegin() + for _i355 in xrange(_size351): + _key356 = [] + (_etype361, _size358) = iprot.readListBegin() + for _i362 in xrange(_size358): + _elem363 = iprot.readI64(); + _key356.append(_elem363) + iprot.readListEnd() + _val357 = iprot.readI64(); + self.executor_start_time_secs[_key356] = _val357 + iprot.readMapEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('Assignment') + if self.master_code_dir is not None: + oprot.writeFieldBegin('master_code_dir', TType.STRING, 1) + oprot.writeString(self.master_code_dir.encode('utf-8')) + oprot.writeFieldEnd() + if self.node_host is not None: + oprot.writeFieldBegin('node_host', TType.MAP, 2) + oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.node_host)) + for kiter364,viter365 in self.node_host.items(): + oprot.writeString(kiter364.encode('utf-8')) + oprot.writeString(viter365.encode('utf-8')) + oprot.writeMapEnd() + oprot.writeFieldEnd() + if self.executor_node_port is not None: + oprot.writeFieldBegin('executor_node_port', TType.MAP, 3) + oprot.writeMapBegin(TType.LIST, TType.STRUCT, len(self.executor_node_port)) + for kiter366,viter367 in self.executor_node_port.items(): + oprot.writeListBegin(TType.I64, len(kiter366)) + for iter368 in kiter366: + oprot.writeI64(iter368) + oprot.writeListEnd() + viter367.write(oprot) + oprot.writeMapEnd() + oprot.writeFieldEnd() + if self.executor_start_time_secs is not None: + oprot.writeFieldBegin('executor_start_time_secs', TType.MAP, 4) + oprot.writeMapBegin(TType.LIST, TType.I64, len(self.executor_start_time_secs)) + for kiter369,viter370 in self.executor_start_time_secs.items(): + oprot.writeListBegin(TType.I64, len(kiter369)) + for iter371 in kiter369: + oprot.writeI64(iter371) + oprot.writeListEnd() + oprot.writeI64(viter370) + oprot.writeMapEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.master_code_dir is None: + raise TProtocol.TProtocolException(message='Required field master_code_dir is unset!') + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class TopologyActionOptions: + """ + Attributes: + - kill_options + - rebalance_options + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRUCT, 'kill_options', (KillOptions, KillOptions.thrift_spec), None, ), # 1 + (2, TType.STRUCT, 'rebalance_options', (RebalanceOptions, RebalanceOptions.thrift_spec), None, ), # 2 + ) + + def __hash__(self): + return 0 + hash(self.kill_options) + hash(self.rebalance_options) + + def __init__(self, kill_options=None, rebalance_options=None,): + self.kill_options = kill_options + self.rebalance_options = rebalance_options + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRUCT: + self.kill_options = KillOptions() + self.kill_options.read(iprot) + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRUCT: + self.rebalance_options = RebalanceOptions() + self.rebalance_options.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('TopologyActionOptions') + if self.kill_options is not None: + oprot.writeFieldBegin('kill_options', TType.STRUCT, 1) + self.kill_options.write(oprot) + oprot.writeFieldEnd() + if self.rebalance_options is not None: + oprot.writeFieldBegin('rebalance_options', TType.STRUCT, 2) + self.rebalance_options.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class StormBase: + """ + Attributes: + - name + - status + - num_workers + - component_executors + - launch_time_secs + - owner + - topology_action_options + - prev_status + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'name', None, None, ), # 1 + (2, TType.I32, 'status', None, None, ), # 2 + (3, TType.I32, 'num_workers', None, None, ), # 3 + (4, TType.MAP, 'component_executors', (TType.STRING,None,TType.I32,None), None, ), # 4 + (5, TType.I32, 'launch_time_secs', None, None, ), # 5 + (6, TType.STRING, 'owner', None, None, ), # 6 + (7, TType.STRUCT, 'topology_action_options', (TopologyActionOptions, TopologyActionOptions.thrift_spec), None, ), # 7 + (8, TType.I32, 'prev_status', None, None, ), # 8 + ) + + def __hash__(self): + return 0 + hash(self.name) + hash(self.status) + hash(self.num_workers) + hash(self.component_executors) + hash(self.launch_time_secs) + hash(self.owner) + hash(self.topology_action_options) + hash(self.prev_status) + + def __init__(self, name=None, status=None, num_workers=None, component_executors=None, launch_time_secs=None, owner=None, topology_action_options=None, prev_status=None,): + self.name = name + self.status = status + self.num_workers = num_workers + self.component_executors = component_executors + self.launch_time_secs = launch_time_secs + self.owner = owner + self.topology_action_options = topology_action_options + self.prev_status = prev_status + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.name = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I32: + self.status = iprot.readI32(); + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I32: + self.num_workers = iprot.readI32(); + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.MAP: + self.component_executors = {} + (_ktype373, _vtype374, _size372 ) = iprot.readMapBegin() + for _i376 in xrange(_size372): + _key377 = iprot.readString().decode('utf-8') + _val378 = iprot.readI32(); + self.component_executors[_key377] = _val378 + iprot.readMapEnd() + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.I32: + self.launch_time_secs = iprot.readI32(); + else: + iprot.skip(ftype) + elif fid == 6: + if ftype == TType.STRING: + self.owner = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 7: + if ftype == TType.STRUCT: + self.topology_action_options = TopologyActionOptions() + self.topology_action_options.read(iprot) + else: + iprot.skip(ftype) + elif fid == 8: + if ftype == TType.I32: + self.prev_status = iprot.readI32(); + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('StormBase') + if self.name is not None: + oprot.writeFieldBegin('name', TType.STRING, 1) + oprot.writeString(self.name.encode('utf-8')) + oprot.writeFieldEnd() + if self.status is not None: + oprot.writeFieldBegin('status', TType.I32, 2) + oprot.writeI32(self.status) + oprot.writeFieldEnd() + if self.num_workers is not None: + oprot.writeFieldBegin('num_workers', TType.I32, 3) + oprot.writeI32(self.num_workers) + oprot.writeFieldEnd() + if self.component_executors is not None: + oprot.writeFieldBegin('component_executors', TType.MAP, 4) + oprot.writeMapBegin(TType.STRING, TType.I32, len(self.component_executors)) + for kiter379,viter380 in self.component_executors.items(): + oprot.writeString(kiter379.encode('utf-8')) + oprot.writeI32(viter380) + oprot.writeMapEnd() + oprot.writeFieldEnd() + if self.launch_time_secs is not None: + oprot.writeFieldBegin('launch_time_secs', TType.I32, 5) + oprot.writeI32(self.launch_time_secs) + oprot.writeFieldEnd() + if self.owner is not None: + oprot.writeFieldBegin('owner', TType.STRING, 6) + oprot.writeString(self.owner.encode('utf-8')) + oprot.writeFieldEnd() + if self.topology_action_options is not None: + oprot.writeFieldBegin('topology_action_options', TType.STRUCT, 7) + self.topology_action_options.write(oprot) + oprot.writeFieldEnd() + if self.prev_status is not None: + oprot.writeFieldBegin('prev_status', TType.I32, 8) + oprot.writeI32(self.prev_status) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.name is None: + raise TProtocol.TProtocolException(message='Required field name is unset!') + if self.status is None: + raise TProtocol.TProtocolException(message='Required field status is unset!') + if self.num_workers is None: + raise TProtocol.TProtocolException(message='Required field num_workers is unset!') + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class ZKWorkerHeartbeat: + """ + Attributes: + - storm_id + - executor_stats + - time_secs + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'storm_id', None, None, ), # 1 + (2, TType.MAP, 'executor_stats', (TType.STRUCT,(ExecutorInfo, ExecutorInfo.thrift_spec),TType.STRUCT,(ExecutorStats, ExecutorStats.thrift_spec)), None, ), # 2 + (3, TType.I32, 'time_secs', None, None, ), # 3 + ) + + def __hash__(self): + return 0 + hash(self.storm_id) + hash(self.executor_stats) + hash(self.time_secs) + + def __init__(self, storm_id=None, executor_stats=None, time_secs=None,): + self.storm_id = storm_id + self.executor_stats = executor_stats + self.time_secs = time_secs + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.storm_id = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.MAP: + self.executor_stats = {} + (_ktype382, _vtype383, _size381 ) = iprot.readMapBegin() + for _i385 in xrange(_size381): + _key386 = ExecutorInfo() + _key386.read(iprot) + _val387 = ExecutorStats() + _val387.read(iprot) + self.executor_stats[_key386] = _val387 + iprot.readMapEnd() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I32: + self.time_secs = iprot.readI32(); + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('ZKWorkerHeartbeat') + if self.storm_id is not None: + oprot.writeFieldBegin('storm_id', TType.STRING, 1) + oprot.writeString(self.storm_id.encode('utf-8')) + oprot.writeFieldEnd() + if self.executor_stats is not None: + oprot.writeFieldBegin('executor_stats', TType.MAP, 2) + oprot.writeMapBegin(TType.STRUCT, TType.STRUCT, len(self.executor_stats)) + for kiter388,viter389 in self.executor_stats.items(): + kiter388.write(oprot) + viter389.write(oprot) + oprot.writeMapEnd() + oprot.writeFieldEnd() + if self.time_secs is not None: + oprot.writeFieldBegin('time_secs', TType.I32, 3) + oprot.writeI32(self.time_secs) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.storm_id is None: + raise TProtocol.TProtocolException(message='Required field storm_id is unset!') + if self.executor_stats is None: + raise TProtocol.TProtocolException(message='Required field executor_stats is unset!') + if self.time_secs is None: + raise TProtocol.TProtocolException(message='Required field time_secs is unset!') + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + class DRPCRequest: """ Attributes: http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/storm.thrift ---------------------------------------------------------------------- diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift index f807b74..3cc0eb9 100644 --- a/storm-core/src/storm.thrift +++ b/storm-core/src/storm.thrift @@ -193,6 +193,7 @@ struct ExecutorStats { 1: required map<string, map<string, i64>> emitted; 2: required map<string, map<string, i64>> transferred; 3: required ExecutorSpecificStats specific; + 4: required double rate; } struct ExecutorInfo { @@ -243,6 +244,56 @@ struct SubmitOptions { 2: optional Credentials creds; } +struct SupervisorInfo { + 1: required i64 time_secs; + 2: required string hostname; + 3: optional string assignment_id; + 4: optional list<i64> used_ports; + 5: optional list<i64> meta; + 6: optional map<string, string> scheduler_meta; + 7: optional i64 uptime_secs; +} +struct NodeInfo { + 1: required string node; + 2: required set<i64> port; +} + +struct Assignment { + 1: required string master_code_dir; + 2: optional map<string, string> node_host = {}; + 3: optional map<list<i64>, NodeInfo> executor_node_port = {}; + 4: optional map<list<i64>, i64> executor_start_time_secs = {}; +} + +enum TopologyStatus { + ACTIVE = 1, + INACTIVE = 2, + REBALANCING = 3, + KILLED = 4 +} + +union TopologyActionOptions { + 1: optional KillOptions kill_options; + 2: optional RebalanceOptions rebalance_options; +} + +struct StormBase { + 1: required string name; + 2: required TopologyStatus status; + 3: required i32 num_workers; + 4: optional map<string, i32> component_executors; + 5: optional i32 launch_time_secs; + 6: optional string owner; + 7: optional TopologyActionOptions topology_action_options; + 8: optional TopologyStatus prev_status;//currently only used during rebalance action. +} + +struct ZKWorkerHeartbeat { + 1: required string storm_id; + 2: required map<ExecutorInfo,ExecutorStats> executor_stats; + 3: required i32 time_secs; +} + service Nimbus { void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite, 3: AuthorizationException aze); void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite, 3: AuthorizationException aze); http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/test/clj/backtype/storm/cluster_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/backtype/storm/cluster_test.clj b/storm-core/test/clj/backtype/storm/cluster_test.clj index 7ed1028..98eae68 100644 --- a/storm-core/test/clj/backtype/storm/cluster_test.clj +++ b/storm-core/test/clj/backtype/storm/cluster_test.clj @@ -25,7 +25,7 @@ (:require [conjure.core]) (:use [conjure core]) (:use [clojure test]) - (:use [backtype.storm cluster config util testing])) + (:use [backtype.storm cluster config util testing thrift log])) (defn mk-config [zk-port] (merge (read-storm-config) @@ -168,10 +168,10 @@ (deftest test-storm-cluster-state-basics (with-inprocess-zookeeper zk-port (let [state (mk-storm-state zk-port) - assignment1 (Assignment. "/aaa" {} {1 [2 2002 1]} {}) - assignment2 (Assignment. "/aaa" {} {1 [2 2002]} {}) - base1 (StormBase. "/tmp/storm1" 1 {:type :active} 2 {} "") - base2 (StormBase. "/tmp/storm2" 2 {:type :active} 2 {} "")] + assignment1 (Assignment. "/aaa" {} {[1] ["1" 1001 1]} {}) + assignment2 (Assignment. "/aaa" {} {[2] ["2" 2002]} {}) + base1 (StormBase. "/tmp/storm1" 1 {:type :active} 2 {} "" nil nil) + base2 (StormBase. "/tmp/storm2" 2 {:type :active} 2 {} "" nil nil)] (is (= [] (.assignments state nil))) (.set-assignment! state "storm1" assignment1) (is (= assignment1 (.assignment-info state "storm1" nil))) @@ -242,12 +242,15 @@ (deftest test-supervisor-state (with-inprocess-zookeeper zk-port (let [state1 (mk-storm-state zk-port) - state2 (mk-storm-state zk-port)] + state2 (mk-storm-state zk-port) + supervisor-info1 (SupervisorInfo. 10 "hostname-1" "id1" [1 2] [] {} 1000 ) + supervisor-info2 (SupervisorInfo. 10 "hostname-2" "id2" [1 2] [] {} 1000 ) + ] (is (= [] (.supervisors state1 nil))) - (.supervisor-heartbeat! state2 "2" {:a 1}) - (.supervisor-heartbeat! state1 "1" {}) - (is (= {:a 1} (.supervisor-info state1 "2"))) - (is (= {} (.supervisor-info state1 "1"))) + (.supervisor-heartbeat! state2 "2" supervisor-info2) + (.supervisor-heartbeat! state1 "1" supervisor-info1) + (is (= supervisor-info2 (.supervisor-info state1 "2"))) + (is (= supervisor-info1 (.supervisor-info state1 "1"))) (is (= #{"1" "2"} (set (.supervisors state1 nil)))) (is (= #{"1" "2"} (set (.supervisors state2 nil)))) (.disconnect state2) @@ -255,8 +258,6 @@ (.disconnect state1) ))) - - (deftest test-cluster-authentication (with-inprocess-zookeeper zk-port (let [builder (Mockito/mock CuratorFrameworkFactory$Builder) http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/test/clj/backtype/storm/nimbus_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/backtype/storm/nimbus_test.clj b/storm-core/test/clj/backtype/storm/nimbus_test.clj index efdad25..7671f58 100644 --- a/storm-core/test/clj/backtype/storm/nimbus_test.clj +++ b/storm-core/test/clj/backtype/storm/nimbus_test.clj @@ -15,7 +15,7 @@ ;; limitations under the License. (ns backtype.storm.nimbus-test (:use [clojure test]) - (:require [backtype.storm [util :as util]]) + (:require [backtype.storm [util :as util] [stats :as stats]]) (:require [backtype.storm.daemon [nimbus :as nimbus]]) (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter]) (:import [backtype.storm.scheduler INimbus]) @@ -113,7 +113,7 @@ curr-beat (.get-worker-heartbeat state storm-id node port) stats (:executor-stats curr-beat)] (.worker-heartbeat! state storm-id node port - {:storm-id storm-id :time-secs (current-time-secs) :uptime 10 :executor-stats (merge stats {executor nil})} + {:storm-id storm-id :time-secs (current-time-secs) :uptime 10 :executor-stats (merge stats {executor (stats/render-stats! (stats/mk-bolt-stats 20))})} ))) (defn slot-assignments [cluster storm-id] @@ -486,7 +486,7 @@ (bind [executor-id1 executor-id2] (topology-executors cluster storm-id)) (bind ass1 (executor-assignment cluster storm-id executor-id1)) (bind ass2 (executor-assignment cluster storm-id executor-id2)) - + (advance-cluster-time cluster 59) (do-executor-heartbeat cluster storm-id executor-id1) (do-executor-heartbeat cluster storm-id executor-id2) http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/test/jvm/backtype/storm/serialization/GzipBridgeSerializationDelegateTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/backtype/storm/serialization/GzipBridgeSerializationDelegateTest.java b/storm-core/test/jvm/backtype/storm/serialization/GzipBridgeSerializationDelegateTest.java index ce49a8a..9026ec3 100644 --- a/storm-core/test/jvm/backtype/storm/serialization/GzipBridgeSerializationDelegateTest.java +++ b/storm-core/test/jvm/backtype/storm/serialization/GzipBridgeSerializationDelegateTest.java @@ -41,7 +41,7 @@ public class GzipBridgeSerializationDelegateTest { byte[] serialized = new GzipSerializationDelegate().serialize(pojo); - TestPojo pojo2 = (TestPojo)testDelegate.deserialize(serialized); + TestPojo pojo2 = (TestPojo)testDelegate.deserialize(serialized, TestPojo.class); assertEquals(pojo2.name, pojo.name); assertEquals(pojo2.age, pojo.age); @@ -55,7 +55,7 @@ public class GzipBridgeSerializationDelegateTest { byte[] serialized = new GzipBridgeSerializationDelegate().serialize(pojo); - TestPojo pojo2 = (TestPojo)testDelegate.deserialize(serialized); + TestPojo pojo2 = (TestPojo)testDelegate.deserialize(serialized, TestPojo.class); assertEquals(pojo2.name, pojo.name); assertEquals(pojo2.age, pojo.age); @@ -69,7 +69,7 @@ public class GzipBridgeSerializationDelegateTest { byte[] serialized = new DefaultSerializationDelegate().serialize(pojo); - TestPojo pojo2 = (TestPojo)testDelegate.deserialize(serialized); + TestPojo pojo2 = (TestPojo)testDelegate.deserialize(serialized, TestPojo.class); assertEquals(pojo2.name, pojo.name); assertEquals(pojo2.age, pojo.age); http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/test/jvm/backtype/storm/serialization/ThriftBridgeSerializationDelegateTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/backtype/storm/serialization/ThriftBridgeSerializationDelegateTest.java b/storm-core/test/jvm/backtype/storm/serialization/ThriftBridgeSerializationDelegateTest.java new file mode 100644 index 0000000..ef17017 --- /dev/null +++ b/storm-core/test/jvm/backtype/storm/serialization/ThriftBridgeSerializationDelegateTest.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.serialization; + +import backtype.storm.generated.ErrorInfo; +import org.junit.Before; +import org.junit.Test; + +import java.io.Serializable; + +import static org.junit.Assert.assertEquals; + + +public class ThriftBridgeSerializationDelegateTest { + + SerializationDelegate testDelegate; + + @Before + public void setUp() throws Exception { + testDelegate = new ThriftSerializationDelegateBridge(); + testDelegate.prepare(null); + } + + @Test + public void testNonThriftInstance() throws Exception { + TestPojo pojo = new TestPojo(); + pojo.name = "foo"; + pojo.age = 100; + + byte[] serialized = new DefaultSerializationDelegate().serialize(pojo); + + TestPojo pojo2 = (TestPojo)testDelegate.deserialize(serialized, TestPojo.class); + + assertEquals(pojo2.name, pojo.name); + assertEquals(pojo2.age, pojo.age); + + serialized = testDelegate.serialize(pojo); + pojo2 = (TestPojo) new DefaultSerializationDelegate().deserialize(serialized, Serializable.class); + assertEquals(pojo2.name, pojo.name); + assertEquals(pojo2.age, pojo.age); + } + + @Test + public void testThriftInstance() throws Exception { + ErrorInfo errorInfo = new ErrorInfo(); + errorInfo.set_error("error"); + errorInfo.set_error_time_secs(1); + errorInfo.set_host("host"); + errorInfo.set_port(1); + + byte[] serialized = new ThriftSerializationDelegate().serialize(errorInfo); + ErrorInfo errorInfo2 = testDelegate.deserialize(serialized, ErrorInfo.class); + assertEquals(errorInfo, errorInfo2); + + serialized = testDelegate.serialize(errorInfo); + errorInfo2 = new ThriftSerializationDelegate().deserialize(serialized, ErrorInfo.class); + assertEquals(errorInfo, errorInfo2); + } + + static class TestPojo implements Serializable { + String name; + int age; + } +}