Awesome glad you got it working! 🍻 On Friday, January 23, 2015, Margus Roo <[email protected]> wrote:
> Yes that's was the problem. In my local machine where from I uploaded > topology to server I used storm-0.9.1-incupator version and over the time I > perhaps added some unnecessary libs into it. > So downloaded fresh storm 0.9.3 and used it to upload topology the problem > was gone. > > So many thanks to pointing it out and one virtual beer to Curtis :) > > Margus (margusja) Roohttp://margus.roo.ee > skype: margusja > +372 51 480 > > On 23/01/15 00:59, Curtis Allen wrote: > > Looks like you have 2 versions of the kafka spout in your classpath. > What output do you get when you run the following command from your project > directory? > > mvn dependency:tree| grep kafka > > > On Thu, Jan 22, 2015 at 2:55 PM, Margus Roo <[email protected] > <javascript:_e(%7B%7D,'cvml','[email protected]');>> wrote: > >> I made my code more simple and still the same problem. >> >> code >> public class Topology { >> >> >> >> public static void main(String[] args) throws Exception { >> Config conf = new Config(); >> conf.setDebug(true); >> >> >> TridentTopology topology = new TridentTopology(); >> BrokerHosts zk = new ZkHosts("bigdata14:2181"); >> >> TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, >> "demo"); >> spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); >> OpaqueTridentKafkaSpout spout = new >> OpaqueTridentKafkaSpout(spoutConf); >> >> Stream spoutStream = topology.newStream("kafka-stream", >> spout); >> spoutStream.broadcast(); >> >> >> // if you wish to run your job on a remote cluster >> conf.setNumWorkers(4); >> conf.put(Config.NIMBUS_THRIFT_PORT, 6627); >> conf.put(Config.STORM_ZOOKEEPER_PORT, 2181); >> conf.put(Config.WORKER_CHILDOPTS , "-Xmx4096m"); >> conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS , "-Xmx4096m"); >> conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING , 512); >> >> StormSubmitter.submitTopology(args[0], conf, >> topology.build()); >> >> } >> } >> >> and log: >> >> 2015-01-22T23:52:20.670+0200 o.a.s.z.ClientCnxn [INFO] Session >> establishment complete on server localhost/127.0.0.1:2181, sessionid = >> 0x14b12e178cd034c, negotiated timeout = 20000 >> 2015-01-22T23:52:20.671+0200 o.a.s.c.f.s.ConnectionStateManager [INFO] >> State change: CONNECTED >> 2015-01-22T23:52:20.672+0200 b.s.zookeeper [INFO] Zookeeper state update: >> :connected:none >> 2015-01-22T23:52:21.685+0200 o.a.s.z.ZooKeeper [INFO] Session: >> 0x14b12e178cd034c closed >> 2015-01-22T23:52:21.685+0200 o.a.s.z.ClientCnxn [INFO] EventThread shut >> down >> 2015-01-22T23:52:21.687+0200 b.s.u.StormBoundedExponentialBackoffRetry >> [INFO] The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries >> [5] >> 2015-01-22T23:52:21.687+0200 o.a.s.c.f.i.CuratorFrameworkImpl [INFO] >> Starting >> 2015-01-22T23:52:21.687+0200 o.a.s.z.ZooKeeper [INFO] Initiating client >> connection, connectString=localhost:2181/storm sessionTimeout=20000 >> watcher=org.apache.storm.curator.ConnectionState@9ca62e2 >> 2015-01-22T23:52:21.688+0200 o.a.s.z.ClientCnxn [INFO] Opening socket >> connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to >> authenticate using SASL (unknown error) >> 2015-01-22T23:52:21.688+0200 o.a.s.z.ClientCnxn [INFO] Socket connection >> established to localhost/0:0:0:0:0:0:0:1:2181, initiating session >> 2015-01-22T23:52:21.694+0200 o.a.s.z.ClientCnxn [INFO] Session >> establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid >> = 0x14b12e178cd034e, negotiated timeout = 20000 >> 2015-01-22T23:52:21.694+0200 o.a.s.c.f.s.ConnectionStateManager [INFO] >> State change: CONNECTED >> 2015-01-22T23:52:21.718+0200 b.s.d.worker [INFO] Reading Assignments. >> 2015-01-22T23:52:21.746+0200 b.s.m.TransportFactory [INFO] Storm peer >> transport plugin:backtype.storm.messaging.netty.Context >> 2015-01-22T23:52:21.872+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN] >> maxRetries too large (300). Pinning to 29 >> 2015-01-22T23:52:21.873+0200 b.s.u.StormBoundedExponentialBackoffRetry >> [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries >> [300] >> 2015-01-22T23:52:21.873+0200 b.s.m.n.Client [INFO] New Netty Client, >> connect to bigdata17, 6702, config: , buffer_size: 5242880 >> 2015-01-22T23:52:21.879+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN] >> maxRetries too large (300). Pinning to 29 >> 2015-01-22T23:52:21.879+0200 b.s.u.StormBoundedExponentialBackoffRetry >> [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries >> [300] >> 2015-01-22T23:52:21.879+0200 b.s.m.n.Client [INFO] New Netty Client, >> connect to bigdata17, 6701, config: , buffer_size: 5242880 >> 2015-01-22T23:52:21.880+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN] >> maxRetries too large (300). Pinning to 29 >> 2015-01-22T23:52:21.880+0200 b.s.u.StormBoundedExponentialBackoffRetry >> [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries >> [300] >> 2015-01-22T23:52:21.880+0200 b.s.m.n.Client [INFO] New Netty Client, >> connect to bigdata17, 6700, config: , buffer_size: 5242880 >> 2015-01-22T23:52:21.882+0200 b.s.m.n.Client [INFO] Reconnect started for >> Netty-Client-bigdata17/192.168.80.214:6702... [0] >> 2015-01-22T23:52:21.888+0200 b.s.m.n.Client [INFO] Reconnect started for >> Netty-Client-bigdata17/192.168.80.214:6701... [0] >> 2015-01-22T23:52:21.888+0200 b.s.m.n.Client [INFO] Reconnect started for >> Netty-Client-bigdata17/192.168.80.214:6700... [0] >> 2015-01-22T23:52:22.001+0200 b.s.m.n.Client [INFO] Reconnect started for >> Netty-Client-bigdata17/192.168.80.214:6701... [1] >> 2015-01-22T23:52:22.002+0200 b.s.m.n.Client [INFO] Reconnect started for >> Netty-Client-bigdata17/192.168.80.214:6700... [1] >> 2015-01-22T23:52:22.002+0200 b.s.m.n.Client [INFO] Reconnect started for >> Netty-Client-bigdata17/192.168.80.214:6702... [1] >> 2015-01-22T23:52:22.068+0200 b.s.d.executor [INFO] Loading executor >> __acker:[5 5] >> 2015-01-22T23:52:22.073+0200 b.s.d.task [INFO] Emitting: __acker __system >> ["startup"] >> 2015-01-22T23:52:22.074+0200 b.s.d.executor [INFO] Loaded executor tasks >> __acker:[5 5] >> 2015-01-22T23:52:22.080+0200 b.s.d.executor [INFO] Timeouts disabled for >> executor __acker:[5 5] >> 2015-01-22T23:52:22.081+0200 b.s.d.executor [INFO] Finished loading >> executor __acker:[5 5] >> 2015-01-22T23:52:22.087+0200 b.s.d.executor [INFO] Preparing bolt >> __acker:(5) >> 2015-01-22T23:52:22.088+0200 b.s.d.executor [INFO] Loading executor >> spout0:[9 9] >> 2015-01-22T23:52:22.091+0200 b.s.d.executor [INFO] Prepared bolt >> __acker:(5) >> 2015-01-22T23:52:22.104+0200 b.s.m.n.Client [INFO] Reconnect started for >> Netty-Client-bigdata17/192.168.80.214:6700... [2] >> 2015-01-22T23:52:22.104+0200 b.s.d.worker [ERROR] Error on initialization >> of server mk-worker >> *java.lang.RuntimeException: java.io.InvalidClassException: >> storm.kafka.KafkaConfig; local class incompatible: stream classdesc >> serialVersionUID = 1806199026298360819, local class serialVersionUID = >> 862253719916491316* >> at >> backtype.storm.serialization.DefaultSerializationDelegate.deserialize(DefaultSerializationDelegate.java:56) >> ~[storm-core-0.9.3.jar:0.9.3] >> at backtype.storm.utils.Utils.deserialize(Utils.java:89) >> ~[storm-core-0.9.3.jar:0.9.3] >> at >> backtype.storm.utils.Utils.getSetComponentObject(Utils.java:228) >> ~[storm-core-0.9.3.jar:0.9.3] >> at backtype.storm.daemon.task$get_task_object.invoke(task.clj:73) >> ~[storm-core-0.9.3.jar:0.9.3] >> at >> backtype.storm.daemon.task$mk_task_data$fn__3131.invoke(task.clj:180) >> ~[storm-core-0.9.3.jar:0.9.3] >> at backtype.storm.util$assoc_apply_self.invoke(util.clj:850) >> ~[storm-core-0.9.3.jar:0.9.3] >> at backtype.storm.daemon.task$mk_task_data.invoke(task.clj:173) >> ~[storm-core-0.9.3.jar:0.9.3] >> at backtype.storm.daemon.task$mk_task.invoke(task.clj:184) >> ~[storm-core-0.9.3.jar:0.9.3] >> at >> backtype.storm.daemon.executor$mk_executor$fn__3310.invoke(executor.clj:323) >> ~[storm-core-0.9.3.jar:0.9.3] >> at clojure.core$map$fn__4207.invoke(core.clj:2485) >> ~[clojure-1.5.1.jar:na] >> at clojure.lang.LazySeq.sval(LazySeq.java:42) >> ~[clojure-1.5.1.jar:na] >> at clojure.lang.LazySeq.seq(LazySeq.java:60) >> ~[clojure-1.5.1.jar:na] >> at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na] >> at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na] >> at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30) >> ~[clojure-1.5.1.jar:na] >> at clojure.core.protocols$fn__6026.invoke(protocols.clj:54) >> ~[clojure-1.5.1.jar:na] >> at >> clojure.core.protocols$fn__5979$G__5974__5992.invoke(protocols.clj:13) >> ~[clojure-1.5.1.jar:na] >> at clojure.core$reduce.invoke(core.clj:6177) >> ~[clojure-1.5.1.jar:na] >> at clojure.core$into.invoke(core.clj:6229) ~[clojure-1.5.1.jar:na] >> at >> backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:323) >> ~[storm-core-0.9.3.jar:0.9.3] >> at >> backtype.storm.daemon.worker$fn__3743$exec_fn__1108__auto____3744$iter__3749__3753$fn__3754.invoke(worker.clj:382) >> ~[storm-core-0.9.3.jar:0.9.3] >> at clojure.lang.LazySeq.sval(LazySeq.java:42) >> ~[clojure-1.5.1.jar:na] >> at clojure.lang.LazySeq.seq(LazySeq.java:60) >> ~[clojure-1.5.1.jar:na] >> at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.5.1.jar:na] >> at clojure.lang.LazySeq.next(LazySeq.java:92) >> ~[clojure-1.5.1.jar:na] >> at clojure.lang.RT.next(RT.java:598) ~[clojure-1.5.1.jar:na] >> at clojure.core$next.invoke(core.clj:64) ~[clojure-1.5.1.jar:na] >> at clojure.core$dorun.invoke(core.clj:2781) >> ~[clojure-1.5.1.jar:na] >> at clojure.core$doall.invoke(core.clj:2796) >> ~[clojure-1.5.1.jar:na] >> at >> backtype.storm.daemon.worker$fn__3743$exec_fn__1108__auto____3744.invoke(worker.clj:382) >> ~[storm-core-0.9.3.jar:0.9.3] >> at clojure.lang.AFn.applyToHelper(AFn.java:185) >> [clojure-1.5.1.jar:na] >> at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na] >> at clojure.core$apply.invoke(core.clj:617) ~[clojure-1.5.1.jar:na] >> at >> backtype.storm.daemon.worker$fn__3743$mk_worker__3799.doInvoke(worker.clj:354) >> [storm-core-0.9.3.jar:0.9.3] >> at clojure.lang.RestFn.invoke(RestFn.java:512) >> [clojure-1.5.1.jar:na] >> at backtype.storm.daemon.worker$_main.invoke(worker.clj:461) >> [storm-core-0.9.3.jar:0.9.3] >> at clojure.lang.AFn.applyToHelper(AFn.java:172) >> [clojure-1.5.1.jar:na] >> at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na] >> at backtype.storm.daemon.worker.main(Unknown Source) >> [storm-core-0.9.3.jar:0.9.3] >> >> Margus (margusja) Roohttp://margus.roo.ee >> skype: margusja >> +372 51 480 >> >> On 22/01/15 20:10, Margus Roo wrote: >> >> Hi >> >> 2015-01-22T20:06:06.175+0200 b.s.u.StormBoundedExponentialBackoffRetry >> [INFO] The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries >> [5] >> 2015-01-22T20:06:06.175+0200 o.a.s.c.f.i.CuratorFrameworkImpl [INFO] >> Starting >> 2015-01-22T20:06:06.175+0200 o.a.s.z.ZooKeeper [INFO] Initiating client >> connection, connectString=localhost:2181/storm sessionTimeout=20000 >> watcher=org.apache.storm.curator.ConnectionState@5c23f9fd >> 2015-01-22T20:06:06.176+0200 o.a.s.z.ClientCnxn [INFO] Opening socket >> connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to >> authenticate using SASL (unknown error) >> 2015-01-22T20:06:06.176+0200 o.a.s.z.ClientCnxn [INFO] Socket connection >> established to localhost/0:0:0:0:0:0:0:1:2181, initiating session >> 2015-01-22T20:06:06.182+0200 o.a.s.z.ClientCnxn [INFO] Session >> establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid >> = 0x14b016362340b5f, negotiated timeout = 20000 >> 2015-01-22T20:06:06.182+0200 o.a.s.c.f.s.ConnectionStateManager [INFO] >> State change: CONNECTED >> 2015-01-22T20:06:06.201+0200 b.s.d.worker [INFO] Reading Assignments. >> 2015-01-22T20:06:06.226+0200 b.s.m.TransportFactory [INFO] Storm peer >> transport plugin:backtype.storm.messaging.netty.Context >> 2015-01-22T20:06:06.318+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN] >> maxRetries too large (300). Pinning to 29 >> 2015-01-22T20:06:06.318+0200 b.s.u.StormBoundedExponentialBackoffRetry >> [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries >> [300] >> 2015-01-22T20:06:06.318+0200 b.s.m.n.Client [INFO] New Netty Client, >> connect to bigdata17, 6703, config: , buffer_size: 5242880 >> 2015-01-22T20:06:06.323+0200 b.s.m.n.Client [INFO] Reconnect started for >> Netty-Client-bigdata17/192.168.80.214:6703... [0] >> 2015-01-22T20:06:06.323+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN] >> maxRetries too large (300). Pinning to 29 >> 2015-01-22T20:06:06.323+0200 b.s.u.StormBoundedExponentialBackoffRetry >> [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries >> [300] >> 2015-01-22T20:06:06.323+0200 b.s.m.n.Client [INFO] New Netty Client, >> connect to bigdata17, 6702, config: , buffer_size: 5242880 >> 2015-01-22T20:06:06.324+0200 b.s.m.n.Client [INFO] Reconnect started for >> Netty-Client-bigdata17/192.168.80.214:6702... [0] >> 2015-01-22T20:06:06.324+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN] >> maxRetries too large (300). Pinning to 29 >> 2015-01-22T20:06:06.324+0200 b.s.u.StormBoundedExponentialBackoffRetry >> [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries >> [300] >> 2015-01-22T20:06:06.324+0200 b.s.m.n.Client [INFO] New Netty Client, >> connect to bigdata17, 6701, config: , buffer_size: 5242880 >> 2015-01-22T20:06:06.324+0200 b.s.m.n.Client [INFO] Reconnect started for >> Netty-Client-bigdata17/192.168.80.214:6701... [0] >> 2015-01-22T20:06:06.339+0200 b.s.m.n.Client [INFO] connection established >> to a remote host Netty-Client-bigdata17/192.168.80.214:6702, [id: >> 0xffc67e3c, /192.168.80.214:55264 => bigdata17/192.168.80.214:6702] >> 2015-01-22T20:06:06.339+0200 b.s.m.n.Client [INFO] connection established >> to a remote host Netty-Client-bigdata17/192.168.80.214:6701, [id: >> 0x54e2a4e9, /192.168.80.214:55748 => bigdata17/192.168.80.214:6701] >> 2015-01-22T20:06:06.339+0200 b.s.m.n.Client [INFO] connection established >> to a remote host Netty-Client-bigdata17/192.168.80.214:6703, [id: >> 0xc78c04d0, /192.168.80.214:42319 => bigdata17/192.168.80.214:6703] >> 2015-01-22T20:06:06.503+0200 b.s.d.executor [INFO] Loading executor >> spout:[5 5] >> 2015-01-22T20:06:06.517+0200 b.s.d.worker [ERROR] Error on initialization >> of server mk-worker >> java.lang.RuntimeException: java.io.InvalidClassException: >> storm.kafka.KafkaConfig; local class incompatible: stream classdesc >> serialVersionUID = 1806199026298360819, local class serialVersionUID = >> 862253719916491316 >> at >> backtype.storm.serialization.DefaultSerializationDelegate.deserialize(DefaultSerializationDelegate.java:56) >> ~[storm-core-0.9.3.jar:0.9.3] >> at backtype.storm.utils.Utils.deserialize(Utils.java:89) >> ~[storm-core-0.9.3.jar:0.9.3] >> at >> backtype.storm.utils.Utils.getSetComponentObject(Utils.java:228) >> ~[storm-core-0.9.3.jar:0.9.3] >> at backtype.storm.daemon.task$get_task_object.invoke(task.clj:73) >> ~[storm-core-0.9.3.jar:0.9.3] >> at >> backtype.storm.daemon.task$mk_task_data$fn__3131.invoke(task.clj:180) >> ~[storm-core-0.9.3.jar:0.9.3] >> at backtype.storm.util$assoc_apply_self.invoke(util.clj:850) >> ~[storm-core-0.9.3.jar:0.9.3] >> at backtype.storm.daemon.task$mk_task_data.invoke(task.clj:173) >> ~[storm-core-0.9.3.jar:0.9.3] >> >> 604,1 91% >> at backtype.storm.util$assoc_apply_self.invoke(util.clj:850) >> ~[storm-core-0.9.3.jar:0.9.3] >> at backtype.storm.daemon.task$mk_task_data.invoke(task.clj:173) >> ~[storm-core-0.9.3.jar:0.9.3] >> at backtype.storm.daemon.task$mk_task.invoke(task.clj:184) >> ~[storm-core-0.9.3.jar:0.9.3] >> at >> backtype.storm.daemon.executor$mk_executor$fn__3310.invoke(executor.clj:323) >> ~[storm-core-0.9.3.jar:0.9.3] >> at clojure.core$map$fn__4207.invoke(core.clj:2485) >> ~[clojure-1.5.1.jar:na] >> at clojure.lang.LazySeq.sval(LazySeq.java:42) >> ~[clojure-1.5.1.jar:na] >> at clojure.lang.LazySeq.seq(LazySeq.java:60) >> ~[clojure-1.5.1.jar:na] >> at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na] >> at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na] >> at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30) >> ~[clojure-1.5.1.jar:na] >> at clojure.core.protocols$fn__6026.invoke(protocols.clj:54) >> ~[clojure-1.5.1.jar:na] >> at >> clojure.core.protocols$fn__5979$G__5974__5992.invoke(protocols.clj:13) >> ~[clojure-1.5.1.jar:na] >> at clojure.core$reduce.invoke(core.clj:6177) >> ~[clojure-1.5.1.jar:na] >> at clojure.core$into.invoke(core.clj:6229) ~[clojure-1.5.1.jar:na] >> at >> backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:323) >> ~[storm-core-0.9.3.jar:0.9.3] >> at >> backtype.storm.daemon.worker$fn__3743$exec_fn__1108__auto____3744$iter__3749__3753$fn__3754.invoke(worker.clj:382) >> ~[storm-core-0.9.3.jar:0.9.3] >> at clojure.lang.LazySeq.sval(LazySeq.java:42) >> ~[clojure-1.5.1.jar:na] >> at clojure.lang.LazySeq.seq(LazySeq.java:60) >> ~[clojure-1.5.1.jar:na] >> at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na] >> at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na] >> at clojure.core$dorun.invoke(core.clj:2780) >> ~[clojure-1.5.1.jar:na] >> at clojure.core$doall.invoke(core.clj:2796) >> ~[clojure-1.5.1.jar:na] >> at >> backtype.storm.daemon.worker$fn__3743$exec_fn__1108__auto____3744.invoke(worker.clj:382) >> ~[storm-core-0.9.3.jar:0.9.3] >> at clojure.lang.AFn.applyToHelper(AFn.java:185) >> [clojure-1.5.1.jar:na] >> at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na] >> at clojure.core$apply.invoke(core.clj:617) ~[clojure-1.5.1.jar:na] >> at >> backtype.storm.daemon.worker$fn__3743$mk_worker__3799.doInvoke(worker.clj:354) >> [storm-core-0.9.3.jar:0.9.3] >> at clojure.lang.RestFn.invoke(RestFn.java:512) >> [clojure-1.5.1.jar:na] >> at backtype.storm.daemon.worker$_main.invoke(worker.clj:461) >> [storm-core-0.9.3.jar:0.9.3] >> at clojure.lang.AFn.applyToHelper(AFn.java:172) >> [clojure-1.5.1.jar:na] >> at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na] >> at backtype.storm.daemon.worker.main(Unknown Source) >> [storm-core-0.9.3.jar:0.9.3] >> *Caused by: java.io.InvalidClassException: storm.kafka.KafkaConfig; local >> class incompatible: stream classdesc serialVersionUID = >> 1806199026298360819, local class serialVersionUID = 862253719916491316* >> >> >> I am using storm numbus and supervisor 0.9.3 >> >> build topology using: >> <dependency> >> <groupId>org.apache.storm</groupId> >> <artifactId>storm-kafka</artifactId> >> <version>0.9.3</version> >> </dependency> >> <dependency> >> <groupId>org.apache.storm</groupId> >> <artifactId>storm-core</artifactId> >> <version>0.9.3</version> >> <scope>provided</scope> >> </dependency> >> <dependency> >> <groupId>org.apache.kafka</groupId> >> <artifactId>kafka_2.10</artifactId> >> <version>0.8.2-beta</version> >> </dependency> >> >> there is no bolt only one spout: >> >> public class Topology { >> public static void main(String[] args) throws Exception { >> Config conf = new Config(); >> conf.setDebug(false); >> >> TopologyBuilder builder = new TopologyBuilder(); >> >> Broker brokerForPartition0 = new Broker("bigdata14"); >> GlobalPartitionInformation partitionInfo = new >> GlobalPartitionInformation(); >> partitionInfo.addPartition(0, brokerForPartition0); >> StaticHosts hosts = new StaticHosts(partitionInfo); >> >> SpoutConfig spoutConfig = new SpoutConfig(hosts, >> "demo", >> "/", // zookeeper root path for offset storing >> "stormdemo"); >> KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); >> >> builder.setSpout("spout", kafkaSpout); >> >> if (args.length > 0) { >> // if you wish to run your job on a remote cluster >> conf.setNumWorkers(4); >> conf.put(Config.NIMBUS_THRIFT_PORT, 6627); >> conf.put(Config.STORM_ZOOKEEPER_PORT, 2181); >> conf.put(Config.WORKER_CHILDOPTS , "-Xmx4096m"); >> conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS , "-Xmx4096m"); >> conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING , 2); >> >> StormSubmitter.submitTopology(args[0], conf, >> builder.createTopology()); >> } else { >> // if you wish to run and test your job locally >> conf.setMaxTaskParallelism(1); >> >> LocalCluster cluster = new LocalCluster(); >> cluster.submitTopology("kafkatopology", conf, >> builder.createTopology()); >> Thread.sleep(10000); >> cluster.shutdown(); >> } >> } >> } >> >> >> Any ideas? >> >> -- >> Margus (margusja) Roohttp://margus.roo.ee >> skype: margusja >> +372 51 480 >> >> >> > >
