Merge remote-tracking branch 'upstream/master' into ru Conflicts: storm-core/src/clj/backtype/storm/cluster.clj storm-core/src/py/storm/ttypes.py storm-core/src/storm.thrift
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0571e22c Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0571e22c Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0571e22c Branch: refs/heads/master Commit: 0571e22c15a1214c3e08510fddad608571f6a6d8 Parents: 6390064 8036109 Author: Parth Brahmbhatt <brahmbhatt.pa...@gmail.com> Authored: Mon Feb 9 12:47:36 2015 -0800 Committer: Parth Brahmbhatt <brahmbhatt.pa...@gmail.com> Committed: Mon Feb 9 12:47:36 2015 -0800 ---------------------------------------------------------------------- CHANGELOG.md | 9 + README.markdown | 8 +- STORM-UI-REST-API.md | 21 + dev-tools/github/__init__.py | 11 + dev-tools/jira-github-join.py | 4 +- dev-tools/storm-merge.py | 31 + docs/README.md | 9 + docs/documentation/Common-patterns.md | 14 +- docs/documentation/Concepts.md | 13 +- docs/documentation/Home.md | 2 +- docs/documentation/Multilang-protocol.md | 4 +- docs/documentation/Powered-By.md | 4 +- .../storm/starter/SkewedRollingTopWords.java | 134 +++ .../storm/starter/bolt/RollingCountAggBolt.java | 78 ++ pom.xml | 11 + storm-core/src/clj/backtype/storm/cluster.clj | 43 +- .../src/clj/backtype/storm/daemon/logviewer.clj | 5 +- .../src/clj/backtype/storm/daemon/nimbus.clj | 35 +- .../src/clj/backtype/storm/daemon/worker.clj | 6 +- storm-core/src/clj/backtype/storm/ui/core.clj | 34 +- .../coordination/BatchSubtopologyBuilder.java | 11 + .../storm/drpc/LinearDRPCInputDeclarer.java | 5 +- .../storm/drpc/LinearDRPCTopologyBuilder.java | 13 +- .../storm/generated/GetInfoOptions.java | 350 +++++++ .../jvm/backtype/storm/generated/Nimbus.java | 974 +++++++++++++++++++ .../storm/generated/NumErrorsChoice.java | 64 ++ .../storm/grouping/PartialKeyGrouping.java | 31 +- .../backtype/storm/topology/InputDeclarer.java | 3 + .../storm/topology/TopologyBuilder.java | 11 + .../TransactionalTopologyBuilder.java | 13 +- .../src/jvm/backtype/storm/utils/Monitor.java | 8 +- .../topology/TridentTopologyBuilder.java | 13 +- storm-core/src/py/storm/DistributedRPC-remote | 0 .../py/storm/DistributedRPCInvocations-remote | 0 storm-core/src/py/storm/Nimbus-remote | 7 + storm-core/src/py/storm/Nimbus.py | 226 +++++ storm-core/src/py/storm/ttypes.py | 80 ++ storm-core/src/storm.thrift | 10 + .../clj/backtype/storm/integration_test.clj | 10 +- .../storm/grouping/PartialKeyGroupingTest.java | 26 +- 40 files changed, 2272 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/0571e22c/storm-core/src/clj/backtype/storm/cluster.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/backtype/storm/cluster.clj index 15bf8a3,4b73f2e..1d5026f --- a/storm-core/src/clj/backtype/storm/cluster.clj +++ b/storm-core/src/clj/backtype/storm/cluster.clj @@@ -230,11 -239,11 +241,11 @@@ (cb id)))) (defn- maybe-deserialize - [ser] + [ser clazz] (when ser - (Utils/deserialize ser))) + (Utils/deserialize ser clazz))) - (defstruct TaskError :error :time-secs :host :port) + (defrecord TaskError [error time-secs host port]) (defn- parse-error-path [^String p] @@@ -440,9 -441,13 +451,13 @@@ (report-error [this storm-id component-id node port error] (let [path (error-path storm-id component-id) + last-error-path (last-error-path storm-id component-id) - data {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port} + data (thriftify-error {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port}) _ (mkdirs cluster-state path acls) - _ (create-sequential cluster-state (str path "/e") (Utils/serialize data) acls) + ser-data (Utils/serialize data) + _ (mkdirs cluster-state path acls) + _ (create-sequential cluster-state (str path "/e") ser-data acls) + _ (set-data cluster-state last-error-path ser-data acls) to-kill (->> (get-children cluster-state path false) (sort-by parse-error-path) reverse @@@ -455,16 -460,22 +470,24 @@@ (let [path (error-path storm-id component-id) errors (if (exists-node? cluster-state path false) (dofor [c (get-children cluster-state path false)] - (let [data (-> (get-data cluster-state (str path "/" c) false) - (maybe-deserialize ErrorInfo) - clojurify-error)] - (when data - (struct TaskError (:error data) (:time-secs data) (:host data) (:port data)) - ))) - ()) - ] + (if-let [data (-> (get-data cluster-state + (str path "/" c) + false) - maybe-deserialize)] ++ (maybe-deserialize ErrorInfo) ++ clojurify-error)] + (map->TaskError data))) + ())] (->> (filter not-nil? errors) (sort-by (comp - :time-secs))))) + + (last-error + [this storm-id component-id] + (let [path (last-error-path storm-id component-id)] + (if (exists-node? cluster-state path false) - (if-let [data (->> (get-data cluster-state path false) - maybe-deserialize)] ++ (if-let [data (-> (get-data cluster-state path false) ++ (maybe-deserialize ErrorInfo) ++ clojurify-error)] + (map->TaskError data))))) (disconnect [this] http://git-wip-us.apache.org/repos/asf/storm/blob/0571e22c/storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/0571e22c/storm-core/src/clj/backtype/storm/daemon/worker.clj ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/0571e22c/storm-core/src/py/storm/ttypes.py ---------------------------------------------------------------------- diff --cc storm-core/src/py/storm/ttypes.py index e15cf1d,46e7a92..112daaa --- a/storm-core/src/py/storm/ttypes.py +++ b/storm-core/src/py/storm/ttypes.py @@@ -44,26 -44,23 +44,43 @@@ class TopologyInitialStatus "INACTIVE": 2, } +class TopologyStatus: + ACTIVE = 1 + INACTIVE = 2 + REBALANCING = 3 + KILLED = 4 + + _VALUES_TO_NAMES = { + 1: "ACTIVE", + 2: "INACTIVE", + 3: "REBALANCING", + 4: "KILLED", + } + + _NAMES_TO_VALUES = { + "ACTIVE": 1, + "INACTIVE": 2, + "REBALANCING": 3, + "KILLED": 4, + } + + class NumErrorsChoice: + ALL = 0 + NONE = 1 + ONE = 2 + + _VALUES_TO_NAMES = { + 0: "ALL", + 1: "NONE", + 2: "ONE", + } + + _NAMES_TO_VALUES = { + "ALL": 0, + "NONE": 1, + "ONE": 2, + } + class JavaObjectArg: """ @@@ -4417,764 -4400,69 +4434,827 @@@ class SubmitOptions def __ne__(self, other): return not (self == other) +class SupervisorInfo: + """ + Attributes: + - time_secs + - hostname + - assignment_id + - used_ports + - meta + - scheduler_meta + - uptime_secs + """ + + thrift_spec = ( + None, # 0 + (1, TType.I64, 'time_secs', None, None, ), # 1 + (2, TType.STRING, 'hostname', None, None, ), # 2 + (3, TType.STRING, 'assignment_id', None, None, ), # 3 + (4, TType.LIST, 'used_ports', (TType.I64,None), None, ), # 4 + (5, TType.LIST, 'meta', (TType.I64,None), None, ), # 5 + (6, TType.MAP, 'scheduler_meta', (TType.STRING,None,TType.STRING,None), None, ), # 6 + (7, TType.I64, 'uptime_secs', None, None, ), # 7 + ) + + def __hash__(self): + return 0 + hash(self.time_secs) + hash(self.hostname) + hash(self.assignment_id) + hash(self.used_ports) + hash(self.meta) + hash(self.scheduler_meta) + hash(self.uptime_secs) + + def __init__(self, time_secs=None, hostname=None, assignment_id=None, used_ports=None, meta=None, scheduler_meta=None, uptime_secs=None,): + self.time_secs = time_secs + self.hostname = hostname + self.assignment_id = assignment_id + self.used_ports = used_ports + self.meta = meta + self.scheduler_meta = scheduler_meta + self.uptime_secs = uptime_secs + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.I64: + self.time_secs = iprot.readI64(); + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRING: + self.hostname = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.STRING: + self.assignment_id = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.LIST: + self.used_ports = [] + (_etype304, _size301) = iprot.readListBegin() + for _i305 in xrange(_size301): + _elem306 = iprot.readI64(); + self.used_ports.append(_elem306) + iprot.readListEnd() + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.LIST: + self.meta = [] + (_etype310, _size307) = iprot.readListBegin() + for _i311 in xrange(_size307): + _elem312 = iprot.readI64(); + self.meta.append(_elem312) + iprot.readListEnd() + else: + iprot.skip(ftype) + elif fid == 6: + if ftype == TType.MAP: + self.scheduler_meta = {} + (_ktype314, _vtype315, _size313 ) = iprot.readMapBegin() + for _i317 in xrange(_size313): + _key318 = iprot.readString().decode('utf-8') + _val319 = iprot.readString().decode('utf-8') + self.scheduler_meta[_key318] = _val319 + iprot.readMapEnd() + else: + iprot.skip(ftype) + elif fid == 7: + if ftype == TType.I64: + self.uptime_secs = iprot.readI64(); + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('SupervisorInfo') + if self.time_secs is not None: + oprot.writeFieldBegin('time_secs', TType.I64, 1) + oprot.writeI64(self.time_secs) + oprot.writeFieldEnd() + if self.hostname is not None: + oprot.writeFieldBegin('hostname', TType.STRING, 2) + oprot.writeString(self.hostname.encode('utf-8')) + oprot.writeFieldEnd() + if self.assignment_id is not None: + oprot.writeFieldBegin('assignment_id', TType.STRING, 3) + oprot.writeString(self.assignment_id.encode('utf-8')) + oprot.writeFieldEnd() + if self.used_ports is not None: + oprot.writeFieldBegin('used_ports', TType.LIST, 4) + oprot.writeListBegin(TType.I64, len(self.used_ports)) + for iter320 in self.used_ports: + oprot.writeI64(iter320) + oprot.writeListEnd() + oprot.writeFieldEnd() + if self.meta is not None: + oprot.writeFieldBegin('meta', TType.LIST, 5) + oprot.writeListBegin(TType.I64, len(self.meta)) + for iter321 in self.meta: + oprot.writeI64(iter321) + oprot.writeListEnd() + oprot.writeFieldEnd() + if self.scheduler_meta is not None: + oprot.writeFieldBegin('scheduler_meta', TType.MAP, 6) + oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.scheduler_meta)) + for kiter322,viter323 in self.scheduler_meta.items(): + oprot.writeString(kiter322.encode('utf-8')) + oprot.writeString(viter323.encode('utf-8')) + oprot.writeMapEnd() + oprot.writeFieldEnd() + if self.uptime_secs is not None: + oprot.writeFieldBegin('uptime_secs', TType.I64, 7) + oprot.writeI64(self.uptime_secs) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.time_secs is None: + raise TProtocol.TProtocolException(message='Required field time_secs is unset!') + if self.hostname is None: + raise TProtocol.TProtocolException(message='Required field hostname is unset!') + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class NodeInfo: + """ + Attributes: + - node + - port + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'node', None, None, ), # 1 + (2, TType.SET, 'port', (TType.I64,None), None, ), # 2 + ) + + def __hash__(self): + return 0 + hash(self.node) + hash(self.port) + + def __init__(self, node=None, port=None,): + self.node = node + self.port = port + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.node = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.SET: + self.port = set() + (_etype327, _size324) = iprot.readSetBegin() + for _i328 in xrange(_size324): + _elem329 = iprot.readI64(); + self.port.add(_elem329) + iprot.readSetEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('NodeInfo') + if self.node is not None: + oprot.writeFieldBegin('node', TType.STRING, 1) + oprot.writeString(self.node.encode('utf-8')) + oprot.writeFieldEnd() + if self.port is not None: + oprot.writeFieldBegin('port', TType.SET, 2) + oprot.writeSetBegin(TType.I64, len(self.port)) + for iter330 in self.port: + oprot.writeI64(iter330) + oprot.writeSetEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.node is None: + raise TProtocol.TProtocolException(message='Required field node is unset!') + if self.port is None: + raise TProtocol.TProtocolException(message='Required field port is unset!') + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class Assignment: + """ + Attributes: + - master_code_dir + - node_host + - executor_node_port + - executor_start_time_secs + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'master_code_dir', None, None, ), # 1 + (2, TType.MAP, 'node_host', (TType.STRING,None,TType.STRING,None), { + }, ), # 2 + (3, TType.MAP, 'executor_node_port', (TType.LIST,(TType.I64,None),TType.STRUCT,(NodeInfo, NodeInfo.thrift_spec)), { + }, ), # 3 + (4, TType.MAP, 'executor_start_time_secs', (TType.LIST,(TType.I64,None),TType.I64,None), { + }, ), # 4 + ) + + def __hash__(self): + return 0 + hash(self.master_code_dir) + hash(self.node_host) + hash(self.executor_node_port) + hash(self.executor_start_time_secs) + + def __init__(self, master_code_dir=None, node_host=thrift_spec[2][4], executor_node_port=thrift_spec[3][4], executor_start_time_secs=thrift_spec[4][4],): + self.master_code_dir = master_code_dir + if node_host is self.thrift_spec[2][4]: + node_host = { + } + self.node_host = node_host + if executor_node_port is self.thrift_spec[3][4]: + executor_node_port = { + } + self.executor_node_port = executor_node_port + if executor_start_time_secs is self.thrift_spec[4][4]: + executor_start_time_secs = { + } + self.executor_start_time_secs = executor_start_time_secs + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.master_code_dir = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.MAP: + self.node_host = {} + (_ktype332, _vtype333, _size331 ) = iprot.readMapBegin() + for _i335 in xrange(_size331): + _key336 = iprot.readString().decode('utf-8') + _val337 = iprot.readString().decode('utf-8') + self.node_host[_key336] = _val337 + iprot.readMapEnd() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.MAP: + self.executor_node_port = {} + (_ktype339, _vtype340, _size338 ) = iprot.readMapBegin() + for _i342 in xrange(_size338): + _key343 = [] + (_etype348, _size345) = iprot.readListBegin() + for _i349 in xrange(_size345): + _elem350 = iprot.readI64(); + _key343.append(_elem350) + iprot.readListEnd() + _val344 = NodeInfo() + _val344.read(iprot) + self.executor_node_port[_key343] = _val344 + iprot.readMapEnd() + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.MAP: + self.executor_start_time_secs = {} + (_ktype352, _vtype353, _size351 ) = iprot.readMapBegin() + for _i355 in xrange(_size351): + _key356 = [] + (_etype361, _size358) = iprot.readListBegin() + for _i362 in xrange(_size358): + _elem363 = iprot.readI64(); + _key356.append(_elem363) + iprot.readListEnd() + _val357 = iprot.readI64(); + self.executor_start_time_secs[_key356] = _val357 + iprot.readMapEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('Assignment') + if self.master_code_dir is not None: + oprot.writeFieldBegin('master_code_dir', TType.STRING, 1) + oprot.writeString(self.master_code_dir.encode('utf-8')) + oprot.writeFieldEnd() + if self.node_host is not None: + oprot.writeFieldBegin('node_host', TType.MAP, 2) + oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.node_host)) + for kiter364,viter365 in self.node_host.items(): + oprot.writeString(kiter364.encode('utf-8')) + oprot.writeString(viter365.encode('utf-8')) + oprot.writeMapEnd() + oprot.writeFieldEnd() + if self.executor_node_port is not None: + oprot.writeFieldBegin('executor_node_port', TType.MAP, 3) + oprot.writeMapBegin(TType.LIST, TType.STRUCT, len(self.executor_node_port)) + for kiter366,viter367 in self.executor_node_port.items(): + oprot.writeListBegin(TType.I64, len(kiter366)) + for iter368 in kiter366: + oprot.writeI64(iter368) + oprot.writeListEnd() + viter367.write(oprot) + oprot.writeMapEnd() + oprot.writeFieldEnd() + if self.executor_start_time_secs is not None: + oprot.writeFieldBegin('executor_start_time_secs', TType.MAP, 4) + oprot.writeMapBegin(TType.LIST, TType.I64, len(self.executor_start_time_secs)) + for kiter369,viter370 in self.executor_start_time_secs.items(): + oprot.writeListBegin(TType.I64, len(kiter369)) + for iter371 in kiter369: + oprot.writeI64(iter371) + oprot.writeListEnd() + oprot.writeI64(viter370) + oprot.writeMapEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.master_code_dir is None: + raise TProtocol.TProtocolException(message='Required field master_code_dir is unset!') + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class TopologyActionOptions: + """ + Attributes: + - kill_options + - rebalance_options + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRUCT, 'kill_options', (KillOptions, KillOptions.thrift_spec), None, ), # 1 + (2, TType.STRUCT, 'rebalance_options', (RebalanceOptions, RebalanceOptions.thrift_spec), None, ), # 2 + ) + + def __hash__(self): + return 0 + hash(self.kill_options) + hash(self.rebalance_options) + + def __init__(self, kill_options=None, rebalance_options=None,): + self.kill_options = kill_options + self.rebalance_options = rebalance_options + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRUCT: + self.kill_options = KillOptions() + self.kill_options.read(iprot) + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRUCT: + self.rebalance_options = RebalanceOptions() + self.rebalance_options.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('TopologyActionOptions') + if self.kill_options is not None: + oprot.writeFieldBegin('kill_options', TType.STRUCT, 1) + self.kill_options.write(oprot) + oprot.writeFieldEnd() + if self.rebalance_options is not None: + oprot.writeFieldBegin('rebalance_options', TType.STRUCT, 2) + self.rebalance_options.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class StormBase: + """ + Attributes: + - name + - status + - num_workers + - component_executors + - launch_time_secs + - owner + - topology_action_options + - prev_status + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'name', None, None, ), # 1 + (2, TType.I32, 'status', None, None, ), # 2 + (3, TType.I32, 'num_workers', None, None, ), # 3 + (4, TType.MAP, 'component_executors', (TType.STRING,None,TType.I32,None), None, ), # 4 + (5, TType.I32, 'launch_time_secs', None, None, ), # 5 + (6, TType.STRING, 'owner', None, None, ), # 6 + (7, TType.STRUCT, 'topology_action_options', (TopologyActionOptions, TopologyActionOptions.thrift_spec), None, ), # 7 + (8, TType.I32, 'prev_status', None, None, ), # 8 + ) + + def __hash__(self): + return 0 + hash(self.name) + hash(self.status) + hash(self.num_workers) + hash(self.component_executors) + hash(self.launch_time_secs) + hash(self.owner) + hash(self.topology_action_options) + hash(self.prev_status) + + def __init__(self, name=None, status=None, num_workers=None, component_executors=None, launch_time_secs=None, owner=None, topology_action_options=None, prev_status=None,): + self.name = name + self.status = status + self.num_workers = num_workers + self.component_executors = component_executors + self.launch_time_secs = launch_time_secs + self.owner = owner + self.topology_action_options = topology_action_options + self.prev_status = prev_status + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.name = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I32: + self.status = iprot.readI32(); + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I32: + self.num_workers = iprot.readI32(); + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.MAP: + self.component_executors = {} + (_ktype373, _vtype374, _size372 ) = iprot.readMapBegin() + for _i376 in xrange(_size372): + _key377 = iprot.readString().decode('utf-8') + _val378 = iprot.readI32(); + self.component_executors[_key377] = _val378 + iprot.readMapEnd() + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.I32: + self.launch_time_secs = iprot.readI32(); + else: + iprot.skip(ftype) + elif fid == 6: + if ftype == TType.STRING: + self.owner = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 7: + if ftype == TType.STRUCT: + self.topology_action_options = TopologyActionOptions() + self.topology_action_options.read(iprot) + else: + iprot.skip(ftype) + elif fid == 8: + if ftype == TType.I32: + self.prev_status = iprot.readI32(); + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('StormBase') + if self.name is not None: + oprot.writeFieldBegin('name', TType.STRING, 1) + oprot.writeString(self.name.encode('utf-8')) + oprot.writeFieldEnd() + if self.status is not None: + oprot.writeFieldBegin('status', TType.I32, 2) + oprot.writeI32(self.status) + oprot.writeFieldEnd() + if self.num_workers is not None: + oprot.writeFieldBegin('num_workers', TType.I32, 3) + oprot.writeI32(self.num_workers) + oprot.writeFieldEnd() + if self.component_executors is not None: + oprot.writeFieldBegin('component_executors', TType.MAP, 4) + oprot.writeMapBegin(TType.STRING, TType.I32, len(self.component_executors)) + for kiter379,viter380 in self.component_executors.items(): + oprot.writeString(kiter379.encode('utf-8')) + oprot.writeI32(viter380) + oprot.writeMapEnd() + oprot.writeFieldEnd() + if self.launch_time_secs is not None: + oprot.writeFieldBegin('launch_time_secs', TType.I32, 5) + oprot.writeI32(self.launch_time_secs) + oprot.writeFieldEnd() + if self.owner is not None: + oprot.writeFieldBegin('owner', TType.STRING, 6) + oprot.writeString(self.owner.encode('utf-8')) + oprot.writeFieldEnd() + if self.topology_action_options is not None: + oprot.writeFieldBegin('topology_action_options', TType.STRUCT, 7) + self.topology_action_options.write(oprot) + oprot.writeFieldEnd() + if self.prev_status is not None: + oprot.writeFieldBegin('prev_status', TType.I32, 8) + oprot.writeI32(self.prev_status) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.name is None: + raise TProtocol.TProtocolException(message='Required field name is unset!') + if self.status is None: + raise TProtocol.TProtocolException(message='Required field status is unset!') + if self.num_workers is None: + raise TProtocol.TProtocolException(message='Required field num_workers is unset!') + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class ZKWorkerHeartbeat: + """ + Attributes: + - storm_id + - executor_stats + - time_secs + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'storm_id', None, None, ), # 1 + (2, TType.MAP, 'executor_stats', (TType.STRUCT,(ExecutorInfo, ExecutorInfo.thrift_spec),TType.STRUCT,(ExecutorStats, ExecutorStats.thrift_spec)), None, ), # 2 + (3, TType.I32, 'time_secs', None, None, ), # 3 + ) + + def __hash__(self): + return 0 + hash(self.storm_id) + hash(self.executor_stats) + hash(self.time_secs) + + def __init__(self, storm_id=None, executor_stats=None, time_secs=None,): + self.storm_id = storm_id + self.executor_stats = executor_stats + self.time_secs = time_secs + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.storm_id = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.MAP: + self.executor_stats = {} + (_ktype382, _vtype383, _size381 ) = iprot.readMapBegin() + for _i385 in xrange(_size381): + _key386 = ExecutorInfo() + _key386.read(iprot) + _val387 = ExecutorStats() + _val387.read(iprot) + self.executor_stats[_key386] = _val387 + iprot.readMapEnd() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I32: + self.time_secs = iprot.readI32(); + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('ZKWorkerHeartbeat') + if self.storm_id is not None: + oprot.writeFieldBegin('storm_id', TType.STRING, 1) + oprot.writeString(self.storm_id.encode('utf-8')) + oprot.writeFieldEnd() + if self.executor_stats is not None: + oprot.writeFieldBegin('executor_stats', TType.MAP, 2) + oprot.writeMapBegin(TType.STRUCT, TType.STRUCT, len(self.executor_stats)) + for kiter388,viter389 in self.executor_stats.items(): + kiter388.write(oprot) + viter389.write(oprot) + oprot.writeMapEnd() + oprot.writeFieldEnd() + if self.time_secs is not None: + oprot.writeFieldBegin('time_secs', TType.I32, 3) + oprot.writeI32(self.time_secs) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.storm_id is None: + raise TProtocol.TProtocolException(message='Required field storm_id is unset!') + if self.executor_stats is None: + raise TProtocol.TProtocolException(message='Required field executor_stats is unset!') + if self.time_secs is None: + raise TProtocol.TProtocolException(message='Required field time_secs is unset!') + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + + class GetInfoOptions: + """ + Attributes: + - num_err_choice + """ + + thrift_spec = ( + None, # 0 + (1, TType.I32, 'num_err_choice', None, None, ), # 1 + ) + + def __hash__(self): + return 0 + hash(self.num_err_choice) + + def __init__(self, num_err_choice=None,): + self.num_err_choice = num_err_choice + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.I32: + self.num_err_choice = iprot.readI32(); + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('GetInfoOptions') + if self.num_err_choice is not None: + oprot.writeFieldBegin('num_err_choice', TType.I32, 1) + oprot.writeI32(self.num_err_choice) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + class DRPCRequest: """ Attributes: http://git-wip-us.apache.org/repos/asf/storm/blob/0571e22c/storm-core/src/storm.thrift ---------------------------------------------------------------------- diff --cc storm-core/src/storm.thrift index 3cc0eb9,066cb4f..04b6a1b --- a/storm-core/src/storm.thrift +++ b/storm-core/src/storm.thrift @@@ -244,55 -243,15 +244,64 @@@ struct SubmitOptions 2: optional Credentials creds; } +struct SupervisorInfo { + 1: required i64 time_secs; + 2: required string hostname; + 3: optional string assignment_id; + 4: optional list<i64> used_ports; + 5: optional list<i64> meta; + 6: optional map<string, string> scheduler_meta; + 7: optional i64 uptime_secs; +} +struct NodeInfo { + 1: required string node; + 2: required set<i64> port; +} + +struct Assignment { + 1: required string master_code_dir; + 2: optional map<string, string> node_host = {}; + 3: optional map<list<i64>, NodeInfo> executor_node_port = {}; + 4: optional map<list<i64>, i64> executor_start_time_secs = {}; +} + +enum TopologyStatus { + ACTIVE = 1, + INACTIVE = 2, + REBALANCING = 3, + KILLED = 4 +} + +union TopologyActionOptions { + 1: optional KillOptions kill_options; + 2: optional RebalanceOptions rebalance_options; +} + +struct StormBase { + 1: required string name; + 2: required TopologyStatus status; + 3: required i32 num_workers; + 4: optional map<string, i32> component_executors; + 5: optional i32 launch_time_secs; + 6: optional string owner; + 7: optional TopologyActionOptions topology_action_options; + 8: optional TopologyStatus prev_status;//currently only used during rebalance action. +} + +struct ZKWorkerHeartbeat { + 1: required string storm_id; + 2: required map<ExecutorInfo,ExecutorStats> executor_stats; + 3: required i32 time_secs; +} + enum NumErrorsChoice { + ALL, + NONE, + ONE + } + + struct GetInfoOptions { + 1: optional NumErrorsChoice num_err_choice; + } service Nimbus { void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite, 3: AuthorizationException aze);