Awesome glad you got it working! 🍻

On Friday, January 23, 2015, Margus Roo <[email protected]> wrote:

>  Yes that's was the problem. In my local machine where from I uploaded
> topology to server I used storm-0.9.1-incupator version and over the time I
> perhaps added some unnecessary libs into it.
> So downloaded fresh storm 0.9.3 and used it to upload topology the problem
> was gone.
>
> So many thanks to pointing it out and one virtual beer to Curtis :)
>
> Margus (margusja) Roohttp://margus.roo.ee
> skype: margusja
> +372 51 480
>
> On 23/01/15 00:59, Curtis Allen wrote:
>
>  Looks like you have 2 versions of the kafka spout in your classpath.
> What output do you get when you run the following command from your project
> directory?
>
> mvn dependency:tree| grep kafka
> ​
>
> On Thu, Jan 22, 2015 at 2:55 PM, Margus Roo <[email protected]
> <javascript:_e(%7B%7D,'cvml','[email protected]');>> wrote:
>
>>  I made my code more simple and still the same problem.
>>
>> code
>> public class Topology {
>>
>>
>>
>>       public static void main(String[] args) throws Exception {
>>            Config conf = new Config();
>>             conf.setDebug(true);
>>
>>
>>            TridentTopology topology = new TridentTopology();
>>            BrokerHosts zk = new ZkHosts("bigdata14:2181");
>>
>>            TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk,
>> "demo");
>>            spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>>            OpaqueTridentKafkaSpout spout = new
>> OpaqueTridentKafkaSpout(spoutConf);
>>
>>            Stream spoutStream = topology.newStream("kafka-stream",
>>                    spout);
>>            spoutStream.broadcast();
>>
>>
>>                 // if you wish to run your job on a remote cluster
>>                 conf.setNumWorkers(4);
>>                 conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
>>                 conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);
>>                 conf.put(Config.WORKER_CHILDOPTS , "-Xmx4096m");
>>                 conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS , "-Xmx4096m");
>>                  conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING  , 512);
>>
>>                 StormSubmitter.submitTopology(args[0], conf,
>> topology.build());
>>
>>         }
>> }
>>
>> and log:
>>
>> 2015-01-22T23:52:20.670+0200 o.a.s.z.ClientCnxn [INFO] Session
>> establishment complete on server localhost/127.0.0.1:2181, sessionid =
>> 0x14b12e178cd034c, negotiated timeout = 20000
>> 2015-01-22T23:52:20.671+0200 o.a.s.c.f.s.ConnectionStateManager [INFO]
>> State change: CONNECTED
>> 2015-01-22T23:52:20.672+0200 b.s.zookeeper [INFO] Zookeeper state update:
>> :connected:none
>> 2015-01-22T23:52:21.685+0200 o.a.s.z.ZooKeeper [INFO] Session:
>> 0x14b12e178cd034c closed
>> 2015-01-22T23:52:21.685+0200 o.a.s.z.ClientCnxn [INFO] EventThread shut
>> down
>> 2015-01-22T23:52:21.687+0200 b.s.u.StormBoundedExponentialBackoffRetry
>> [INFO] The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries
>> [5]
>> 2015-01-22T23:52:21.687+0200 o.a.s.c.f.i.CuratorFrameworkImpl [INFO]
>> Starting
>> 2015-01-22T23:52:21.687+0200 o.a.s.z.ZooKeeper [INFO] Initiating client
>> connection, connectString=localhost:2181/storm sessionTimeout=20000
>> watcher=org.apache.storm.curator.ConnectionState@9ca62e2
>> 2015-01-22T23:52:21.688+0200 o.a.s.z.ClientCnxn [INFO] Opening socket
>> connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to
>> authenticate using SASL (unknown error)
>> 2015-01-22T23:52:21.688+0200 o.a.s.z.ClientCnxn [INFO] Socket connection
>> established to localhost/0:0:0:0:0:0:0:1:2181, initiating session
>> 2015-01-22T23:52:21.694+0200 o.a.s.z.ClientCnxn [INFO] Session
>> establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid
>> = 0x14b12e178cd034e, negotiated timeout = 20000
>> 2015-01-22T23:52:21.694+0200 o.a.s.c.f.s.ConnectionStateManager [INFO]
>> State change: CONNECTED
>> 2015-01-22T23:52:21.718+0200 b.s.d.worker [INFO] Reading Assignments.
>> 2015-01-22T23:52:21.746+0200 b.s.m.TransportFactory [INFO] Storm peer
>> transport plugin:backtype.storm.messaging.netty.Context
>> 2015-01-22T23:52:21.872+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN]
>> maxRetries too large (300). Pinning to 29
>> 2015-01-22T23:52:21.873+0200 b.s.u.StormBoundedExponentialBackoffRetry
>> [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries
>> [300]
>> 2015-01-22T23:52:21.873+0200 b.s.m.n.Client [INFO] New Netty Client,
>> connect to bigdata17, 6702, config: , buffer_size: 5242880
>> 2015-01-22T23:52:21.879+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN]
>> maxRetries too large (300). Pinning to 29
>> 2015-01-22T23:52:21.879+0200 b.s.u.StormBoundedExponentialBackoffRetry
>> [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries
>> [300]
>> 2015-01-22T23:52:21.879+0200 b.s.m.n.Client [INFO] New Netty Client,
>> connect to bigdata17, 6701, config: , buffer_size: 5242880
>> 2015-01-22T23:52:21.880+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN]
>> maxRetries too large (300). Pinning to 29
>> 2015-01-22T23:52:21.880+0200 b.s.u.StormBoundedExponentialBackoffRetry
>> [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries
>> [300]
>> 2015-01-22T23:52:21.880+0200 b.s.m.n.Client [INFO] New Netty Client,
>> connect to bigdata17, 6700, config: , buffer_size: 5242880
>> 2015-01-22T23:52:21.882+0200 b.s.m.n.Client [INFO] Reconnect started for
>> Netty-Client-bigdata17/192.168.80.214:6702... [0]
>> 2015-01-22T23:52:21.888+0200 b.s.m.n.Client [INFO] Reconnect started for
>> Netty-Client-bigdata17/192.168.80.214:6701... [0]
>> 2015-01-22T23:52:21.888+0200 b.s.m.n.Client [INFO] Reconnect started for
>> Netty-Client-bigdata17/192.168.80.214:6700... [0]
>> 2015-01-22T23:52:22.001+0200 b.s.m.n.Client [INFO] Reconnect started for
>> Netty-Client-bigdata17/192.168.80.214:6701... [1]
>> 2015-01-22T23:52:22.002+0200 b.s.m.n.Client [INFO] Reconnect started for
>> Netty-Client-bigdata17/192.168.80.214:6700... [1]
>> 2015-01-22T23:52:22.002+0200 b.s.m.n.Client [INFO] Reconnect started for
>> Netty-Client-bigdata17/192.168.80.214:6702... [1]
>> 2015-01-22T23:52:22.068+0200 b.s.d.executor [INFO] Loading executor
>> __acker:[5 5]
>> 2015-01-22T23:52:22.073+0200 b.s.d.task [INFO] Emitting: __acker __system
>> ["startup"]
>> 2015-01-22T23:52:22.074+0200 b.s.d.executor [INFO] Loaded executor tasks
>> __acker:[5 5]
>> 2015-01-22T23:52:22.080+0200 b.s.d.executor [INFO] Timeouts disabled for
>> executor __acker:[5 5]
>> 2015-01-22T23:52:22.081+0200 b.s.d.executor [INFO] Finished loading
>> executor __acker:[5 5]
>> 2015-01-22T23:52:22.087+0200 b.s.d.executor [INFO] Preparing bolt
>> __acker:(5)
>> 2015-01-22T23:52:22.088+0200 b.s.d.executor [INFO] Loading executor
>> spout0:[9 9]
>> 2015-01-22T23:52:22.091+0200 b.s.d.executor [INFO] Prepared bolt
>> __acker:(5)
>> 2015-01-22T23:52:22.104+0200 b.s.m.n.Client [INFO] Reconnect started for
>> Netty-Client-bigdata17/192.168.80.214:6700... [2]
>> 2015-01-22T23:52:22.104+0200 b.s.d.worker [ERROR] Error on initialization
>> of server mk-worker
>> *java.lang.RuntimeException: java.io.InvalidClassException:
>> storm.kafka.KafkaConfig; local class incompatible: stream classdesc
>> serialVersionUID = 1806199026298360819, local class serialVersionUID =
>> 862253719916491316*
>>         at
>> backtype.storm.serialization.DefaultSerializationDelegate.deserialize(DefaultSerializationDelegate.java:56)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at backtype.storm.utils.Utils.deserialize(Utils.java:89)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at
>> backtype.storm.utils.Utils.getSetComponentObject(Utils.java:228)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at backtype.storm.daemon.task$get_task_object.invoke(task.clj:73)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at
>> backtype.storm.daemon.task$mk_task_data$fn__3131.invoke(task.clj:180)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at backtype.storm.util$assoc_apply_self.invoke(util.clj:850)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at backtype.storm.daemon.task$mk_task_data.invoke(task.clj:173)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>          at backtype.storm.daemon.task$mk_task.invoke(task.clj:184)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at
>> backtype.storm.daemon.executor$mk_executor$fn__3310.invoke(executor.clj:323)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at clojure.core$map$fn__4207.invoke(core.clj:2485)
>> ~[clojure-1.5.1.jar:na]
>>         at clojure.lang.LazySeq.sval(LazySeq.java:42)
>> ~[clojure-1.5.1.jar:na]
>>         at clojure.lang.LazySeq.seq(LazySeq.java:60)
>> ~[clojure-1.5.1.jar:na]
>>         at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na]
>>         at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na]
>>         at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30)
>> ~[clojure-1.5.1.jar:na]
>>         at clojure.core.protocols$fn__6026.invoke(protocols.clj:54)
>> ~[clojure-1.5.1.jar:na]
>>         at
>> clojure.core.protocols$fn__5979$G__5974__5992.invoke(protocols.clj:13)
>> ~[clojure-1.5.1.jar:na]
>>         at clojure.core$reduce.invoke(core.clj:6177)
>> ~[clojure-1.5.1.jar:na]
>>         at clojure.core$into.invoke(core.clj:6229) ~[clojure-1.5.1.jar:na]
>>         at
>> backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:323)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at
>> backtype.storm.daemon.worker$fn__3743$exec_fn__1108__auto____3744$iter__3749__3753$fn__3754.invoke(worker.clj:382)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at clojure.lang.LazySeq.sval(LazySeq.java:42)
>> ~[clojure-1.5.1.jar:na]
>>         at clojure.lang.LazySeq.seq(LazySeq.java:60)
>> ~[clojure-1.5.1.jar:na]
>>          at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.5.1.jar:na]
>>         at clojure.lang.LazySeq.next(LazySeq.java:92)
>> ~[clojure-1.5.1.jar:na]
>>         at clojure.lang.RT.next(RT.java:598) ~[clojure-1.5.1.jar:na]
>>         at clojure.core$next.invoke(core.clj:64) ~[clojure-1.5.1.jar:na]
>>         at clojure.core$dorun.invoke(core.clj:2781)
>> ~[clojure-1.5.1.jar:na]
>>         at clojure.core$doall.invoke(core.clj:2796)
>> ~[clojure-1.5.1.jar:na]
>>         at
>> backtype.storm.daemon.worker$fn__3743$exec_fn__1108__auto____3744.invoke(worker.clj:382)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at clojure.lang.AFn.applyToHelper(AFn.java:185)
>> [clojure-1.5.1.jar:na]
>>         at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
>>         at clojure.core$apply.invoke(core.clj:617) ~[clojure-1.5.1.jar:na]
>>         at
>> backtype.storm.daemon.worker$fn__3743$mk_worker__3799.doInvoke(worker.clj:354)
>> [storm-core-0.9.3.jar:0.9.3]
>>         at clojure.lang.RestFn.invoke(RestFn.java:512)
>> [clojure-1.5.1.jar:na]
>>         at backtype.storm.daemon.worker$_main.invoke(worker.clj:461)
>> [storm-core-0.9.3.jar:0.9.3]
>>         at clojure.lang.AFn.applyToHelper(AFn.java:172)
>> [clojure-1.5.1.jar:na]
>>         at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
>>         at backtype.storm.daemon.worker.main(Unknown Source)
>> [storm-core-0.9.3.jar:0.9.3]
>>
>>  Margus (margusja) Roohttp://margus.roo.ee
>> skype: margusja
>> +372 51 480
>>
>>   On 22/01/15 20:10, Margus Roo wrote:
>>
>> Hi
>>
>> 2015-01-22T20:06:06.175+0200 b.s.u.StormBoundedExponentialBackoffRetry
>> [INFO] The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries
>> [5]
>> 2015-01-22T20:06:06.175+0200 o.a.s.c.f.i.CuratorFrameworkImpl [INFO]
>> Starting
>> 2015-01-22T20:06:06.175+0200 o.a.s.z.ZooKeeper [INFO] Initiating client
>> connection, connectString=localhost:2181/storm sessionTimeout=20000
>> watcher=org.apache.storm.curator.ConnectionState@5c23f9fd
>> 2015-01-22T20:06:06.176+0200 o.a.s.z.ClientCnxn [INFO] Opening socket
>> connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to
>> authenticate using SASL (unknown error)
>> 2015-01-22T20:06:06.176+0200 o.a.s.z.ClientCnxn [INFO] Socket connection
>> established to localhost/0:0:0:0:0:0:0:1:2181, initiating session
>> 2015-01-22T20:06:06.182+0200 o.a.s.z.ClientCnxn [INFO] Session
>> establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid
>> = 0x14b016362340b5f, negotiated timeout = 20000
>> 2015-01-22T20:06:06.182+0200 o.a.s.c.f.s.ConnectionStateManager [INFO]
>> State change: CONNECTED
>> 2015-01-22T20:06:06.201+0200 b.s.d.worker [INFO] Reading Assignments.
>> 2015-01-22T20:06:06.226+0200 b.s.m.TransportFactory [INFO] Storm peer
>> transport plugin:backtype.storm.messaging.netty.Context
>> 2015-01-22T20:06:06.318+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN]
>> maxRetries too large (300). Pinning to 29
>> 2015-01-22T20:06:06.318+0200 b.s.u.StormBoundedExponentialBackoffRetry
>> [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries
>> [300]
>> 2015-01-22T20:06:06.318+0200 b.s.m.n.Client [INFO] New Netty Client,
>> connect to bigdata17, 6703, config: , buffer_size: 5242880
>> 2015-01-22T20:06:06.323+0200 b.s.m.n.Client [INFO] Reconnect started for
>> Netty-Client-bigdata17/192.168.80.214:6703... [0]
>> 2015-01-22T20:06:06.323+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN]
>> maxRetries too large (300). Pinning to 29
>> 2015-01-22T20:06:06.323+0200 b.s.u.StormBoundedExponentialBackoffRetry
>> [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries
>> [300]
>> 2015-01-22T20:06:06.323+0200 b.s.m.n.Client [INFO] New Netty Client,
>> connect to bigdata17, 6702, config: , buffer_size: 5242880
>> 2015-01-22T20:06:06.324+0200 b.s.m.n.Client [INFO] Reconnect started for
>> Netty-Client-bigdata17/192.168.80.214:6702... [0]
>> 2015-01-22T20:06:06.324+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN]
>> maxRetries too large (300). Pinning to 29
>> 2015-01-22T20:06:06.324+0200 b.s.u.StormBoundedExponentialBackoffRetry
>> [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries
>> [300]
>> 2015-01-22T20:06:06.324+0200 b.s.m.n.Client [INFO] New Netty Client,
>> connect to bigdata17, 6701, config: , buffer_size: 5242880
>> 2015-01-22T20:06:06.324+0200 b.s.m.n.Client [INFO] Reconnect started for
>> Netty-Client-bigdata17/192.168.80.214:6701... [0]
>> 2015-01-22T20:06:06.339+0200 b.s.m.n.Client [INFO] connection established
>> to a remote host Netty-Client-bigdata17/192.168.80.214:6702, [id:
>> 0xffc67e3c, /192.168.80.214:55264 => bigdata17/192.168.80.214:6702]
>> 2015-01-22T20:06:06.339+0200 b.s.m.n.Client [INFO] connection established
>> to a remote host Netty-Client-bigdata17/192.168.80.214:6701, [id:
>> 0x54e2a4e9, /192.168.80.214:55748 => bigdata17/192.168.80.214:6701]
>> 2015-01-22T20:06:06.339+0200 b.s.m.n.Client [INFO] connection established
>> to a remote host Netty-Client-bigdata17/192.168.80.214:6703, [id:
>> 0xc78c04d0, /192.168.80.214:42319 => bigdata17/192.168.80.214:6703]
>> 2015-01-22T20:06:06.503+0200 b.s.d.executor [INFO] Loading executor
>> spout:[5 5]
>> 2015-01-22T20:06:06.517+0200 b.s.d.worker [ERROR] Error on initialization
>> of server mk-worker
>> java.lang.RuntimeException: java.io.InvalidClassException:
>> storm.kafka.KafkaConfig; local class incompatible: stream classdesc
>> serialVersionUID = 1806199026298360819, local class serialVersionUID =
>> 862253719916491316
>>         at
>> backtype.storm.serialization.DefaultSerializationDelegate.deserialize(DefaultSerializationDelegate.java:56)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at backtype.storm.utils.Utils.deserialize(Utils.java:89)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at
>> backtype.storm.utils.Utils.getSetComponentObject(Utils.java:228)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at backtype.storm.daemon.task$get_task_object.invoke(task.clj:73)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at
>> backtype.storm.daemon.task$mk_task_data$fn__3131.invoke(task.clj:180)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at backtype.storm.util$assoc_apply_self.invoke(util.clj:850)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at backtype.storm.daemon.task$mk_task_data.invoke(task.clj:173)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>
>> 604,1         91%
>>         at backtype.storm.util$assoc_apply_self.invoke(util.clj:850)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at backtype.storm.daemon.task$mk_task_data.invoke(task.clj:173)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at backtype.storm.daemon.task$mk_task.invoke(task.clj:184)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at
>> backtype.storm.daemon.executor$mk_executor$fn__3310.invoke(executor.clj:323)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at clojure.core$map$fn__4207.invoke(core.clj:2485)
>> ~[clojure-1.5.1.jar:na]
>>         at clojure.lang.LazySeq.sval(LazySeq.java:42)
>> ~[clojure-1.5.1.jar:na]
>>         at clojure.lang.LazySeq.seq(LazySeq.java:60)
>> ~[clojure-1.5.1.jar:na]
>>         at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na]
>>         at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na]
>>         at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30)
>> ~[clojure-1.5.1.jar:na]
>>         at clojure.core.protocols$fn__6026.invoke(protocols.clj:54)
>> ~[clojure-1.5.1.jar:na]
>>         at
>> clojure.core.protocols$fn__5979$G__5974__5992.invoke(protocols.clj:13)
>> ~[clojure-1.5.1.jar:na]
>>         at clojure.core$reduce.invoke(core.clj:6177)
>> ~[clojure-1.5.1.jar:na]
>>         at clojure.core$into.invoke(core.clj:6229) ~[clojure-1.5.1.jar:na]
>>         at
>> backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:323)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at
>> backtype.storm.daemon.worker$fn__3743$exec_fn__1108__auto____3744$iter__3749__3753$fn__3754.invoke(worker.clj:382)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at clojure.lang.LazySeq.sval(LazySeq.java:42)
>> ~[clojure-1.5.1.jar:na]
>>         at clojure.lang.LazySeq.seq(LazySeq.java:60)
>> ~[clojure-1.5.1.jar:na]
>>         at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na]
>>         at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na]
>>         at clojure.core$dorun.invoke(core.clj:2780)
>> ~[clojure-1.5.1.jar:na]
>>         at clojure.core$doall.invoke(core.clj:2796)
>> ~[clojure-1.5.1.jar:na]
>>         at
>> backtype.storm.daemon.worker$fn__3743$exec_fn__1108__auto____3744.invoke(worker.clj:382)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at clojure.lang.AFn.applyToHelper(AFn.java:185)
>> [clojure-1.5.1.jar:na]
>>         at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
>>         at clojure.core$apply.invoke(core.clj:617) ~[clojure-1.5.1.jar:na]
>>         at
>> backtype.storm.daemon.worker$fn__3743$mk_worker__3799.doInvoke(worker.clj:354)
>> [storm-core-0.9.3.jar:0.9.3]
>>         at clojure.lang.RestFn.invoke(RestFn.java:512)
>> [clojure-1.5.1.jar:na]
>>         at backtype.storm.daemon.worker$_main.invoke(worker.clj:461)
>> [storm-core-0.9.3.jar:0.9.3]
>>         at clojure.lang.AFn.applyToHelper(AFn.java:172)
>> [clojure-1.5.1.jar:na]
>>         at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
>>         at backtype.storm.daemon.worker.main(Unknown Source)
>> [storm-core-0.9.3.jar:0.9.3]
>> *Caused by: java.io.InvalidClassException: storm.kafka.KafkaConfig; local
>> class incompatible: stream classdesc serialVersionUID =
>> 1806199026298360819, local class serialVersionUID = 862253719916491316*
>>
>>
>> I am using storm numbus and supervisor 0.9.3
>>
>> build topology using:
>>     <dependency>
>>         <groupId>org.apache.storm</groupId>
>>         <artifactId>storm-kafka</artifactId>
>>         <version>0.9.3</version>
>>     </dependency>
>>     <dependency>
>>         <groupId>org.apache.storm</groupId>
>>         <artifactId>storm-core</artifactId>
>>         <version>0.9.3</version>
>>         <scope>provided</scope>
>>     </dependency>
>>         <dependency>
>>             <groupId>org.apache.kafka</groupId>
>>             <artifactId>kafka_2.10</artifactId>
>>             <version>0.8.2-beta</version>
>>         </dependency>
>>
>> there is no bolt only one spout:
>>
>> public class Topology {
>>       public static void main(String[] args) throws Exception {
>>             Config conf = new Config();
>>             conf.setDebug(false);
>>
>>             TopologyBuilder builder = new TopologyBuilder();
>>
>>             Broker brokerForPartition0 = new Broker("bigdata14");
>>             GlobalPartitionInformation partitionInfo = new
>> GlobalPartitionInformation();
>>             partitionInfo.addPartition(0, brokerForPartition0);
>>             StaticHosts hosts = new StaticHosts(partitionInfo);
>>
>>             SpoutConfig spoutConfig = new SpoutConfig(hosts,
>>                     "demo",
>>                     "/",  // zookeeper root path for offset storing
>>                     "stormdemo");
>>                     KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>>
>>             builder.setSpout("spout", kafkaSpout);
>>
>>             if (args.length > 0) {
>>                 // if you wish to run your job on a remote cluster
>>                 conf.setNumWorkers(4);
>>                 conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
>>                 conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);
>>                 conf.put(Config.WORKER_CHILDOPTS , "-Xmx4096m");
>>                 conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS , "-Xmx4096m");
>>                 conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING  , 2);
>>
>>                 StormSubmitter.submitTopology(args[0], conf,
>> builder.createTopology());
>>             } else {
>>                 // if you wish to run and test your job locally
>>                 conf.setMaxTaskParallelism(1);
>>
>>                 LocalCluster cluster = new LocalCluster();
>>                 cluster.submitTopology("kafkatopology", conf,
>> builder.createTopology());
>>                 Thread.sleep(10000);
>>                 cluster.shutdown();
>>             }
>>         }
>> }
>>
>>
>> Any ideas?
>>
>> --
>> Margus (margusja) Roohttp://margus.roo.ee
>> skype: margusja
>> +372 51 480
>>
>>
>>
>
>

Reply via email to