Yes that's was the problem. In my local machine where from I uploaded topology to server I used storm-0.9.1-incupator version and over the time I perhaps added some unnecessary libs into it. So downloaded fresh storm 0.9.3 and used it to upload topology the problem was gone.

So many thanks to pointing it out and one virtual beer to Curtis :)

Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
+372 51 480

On 23/01/15 00:59, Curtis Allen wrote:

Looks like you have 2 versions of the kafka spout in your classpath. What output do you get when you run the following command from your project directory?

|mvn dependency:tree| grep kafka|

​

On Thu, Jan 22, 2015 at 2:55 PM, Margus Roo <[email protected] <mailto:[email protected]>> wrote:

    I made my code more simple and still the same problem.

    code
    public class Topology {



          public static void main(String[] args) throws Exception {
               Config conf = new Config();
               conf.setDebug(true);


               TridentTopology topology = new TridentTopology();
               BrokerHosts zk = new ZkHosts("bigdata14:2181");

               TridentKafkaConfig spoutConf = new
    TridentKafkaConfig(zk, "demo");
               spoutConf.scheme = new SchemeAsMultiScheme(new
    StringScheme());
               OpaqueTridentKafkaSpout spout = new
    OpaqueTridentKafkaSpout(spoutConf);

               Stream spoutStream = topology.newStream("kafka-stream",
                       spout);
               spoutStream.broadcast();


                    // if you wish to run your job on a remote cluster
                    conf.setNumWorkers(4);
                    conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
                    conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);
                    conf.put(Config.WORKER_CHILDOPTS , "-Xmx4096m");
    conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS , "-Xmx4096m");
    conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING  , 512);

                    StormSubmitter.submitTopology(args[0], conf,
    topology.build());

            }
    }

    and log:

    2015-01-22T23:52:20.670+0200 o.a.s.z.ClientCnxn [INFO] Session
    establishment complete on server localhost/127.0.0.1:2181
    <http://127.0.0.1:2181>, sessionid = 0x14b12e178cd034c, negotiated
    timeout = 20000
    2015-01-22T23:52:20.671+0200 o.a.s.c.f.s.ConnectionStateManager
    [INFO] State change: CONNECTED
    2015-01-22T23:52:20.672+0200 b.s.zookeeper [INFO] Zookeeper state
    update: :connected:none
    2015-01-22T23:52:21.685+0200 o.a.s.z.ZooKeeper [INFO] Session:
    0x14b12e178cd034c closed
    2015-01-22T23:52:21.685+0200 o.a.s.z.ClientCnxn [INFO] EventThread
    shut down
    2015-01-22T23:52:21.687+0200
    b.s.u.StormBoundedExponentialBackoffRetry [INFO] The
    baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5]
    2015-01-22T23:52:21.687+0200 o.a.s.c.f.i.CuratorFrameworkImpl
    [INFO] Starting
    2015-01-22T23:52:21.687+0200 o.a.s.z.ZooKeeper [INFO] Initiating
    client connection, connectString=localhost:2181/storm
    sessionTimeout=20000
    watcher=org.apache.storm.curator.ConnectionState@9ca62e2
    2015-01-22T23:52:21.688+0200 o.a.s.z.ClientCnxn [INFO] Opening
    socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will
    not attempt to authenticate using SASL (unknown error)
    2015-01-22T23:52:21.688+0200 o.a.s.z.ClientCnxn [INFO] Socket
    connection established to localhost/0:0:0:0:0:0:0:1:2181,
    initiating session
    2015-01-22T23:52:21.694+0200 o.a.s.z.ClientCnxn [INFO] Session
    establishment complete on server localhost/0:0:0:0:0:0:0:1:2181,
    sessionid = 0x14b12e178cd034e, negotiated timeout = 20000
    2015-01-22T23:52:21.694+0200 o.a.s.c.f.s.ConnectionStateManager
    [INFO] State change: CONNECTED
    2015-01-22T23:52:21.718+0200 b.s.d.worker [INFO] Reading Assignments.
    2015-01-22T23:52:21.746+0200 b.s.m.TransportFactory [INFO] Storm
    peer transport plugin:backtype.storm.messaging.netty.Context
    2015-01-22T23:52:21.872+0200 o.a.s.c.r.ExponentialBackoffRetry
    [WARN] maxRetries too large (300). Pinning to 29
    2015-01-22T23:52:21.873+0200
    b.s.u.StormBoundedExponentialBackoffRetry [INFO] The
    baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries [300]
    2015-01-22T23:52:21.873+0200 b.s.m.n.Client [INFO] New Netty
    Client, connect to bigdata17, 6702, config: , buffer_size: 5242880
    2015-01-22T23:52:21.879+0200 o.a.s.c.r.ExponentialBackoffRetry
    [WARN] maxRetries too large (300). Pinning to 29
    2015-01-22T23:52:21.879+0200
    b.s.u.StormBoundedExponentialBackoffRetry [INFO] The
    baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries [300]
    2015-01-22T23:52:21.879+0200 b.s.m.n.Client [INFO] New Netty
    Client, connect to bigdata17, 6701, config: , buffer_size: 5242880
    2015-01-22T23:52:21.880+0200 o.a.s.c.r.ExponentialBackoffRetry
    [WARN] maxRetries too large (300). Pinning to 29
    2015-01-22T23:52:21.880+0200
    b.s.u.StormBoundedExponentialBackoffRetry [INFO] The
    baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries [300]
    2015-01-22T23:52:21.880+0200 b.s.m.n.Client [INFO] New Netty
    Client, connect to bigdata17, 6700, config: , buffer_size: 5242880
    2015-01-22T23:52:21.882+0200 b.s.m.n.Client [INFO] Reconnect
    started for Netty-Client-bigdata17/192.168.80.214:6702... [0]
    2015-01-22T23:52:21.888+0200 b.s.m.n.Client [INFO] Reconnect
    started for Netty-Client-bigdata17/192.168.80.214:6701... [0]
    2015-01-22T23:52:21.888+0200 b.s.m.n.Client [INFO] Reconnect
    started for Netty-Client-bigdata17/192.168.80.214:6700... [0]
    2015-01-22T23:52:22.001+0200 b.s.m.n.Client [INFO] Reconnect
    started for Netty-Client-bigdata17/192.168.80.214:6701... [1]
    2015-01-22T23:52:22.002+0200 b.s.m.n.Client [INFO] Reconnect
    started for Netty-Client-bigdata17/192.168.80.214:6700... [1]
    2015-01-22T23:52:22.002+0200 b.s.m.n.Client [INFO] Reconnect
    started for Netty-Client-bigdata17/192.168.80.214:6702... [1]
    2015-01-22T23:52:22.068+0200 b.s.d.executor [INFO] Loading
    executor __acker:[5 5]
    2015-01-22T23:52:22.073+0200 b.s.d.task [INFO] Emitting: __acker
    __system ["startup"]
    2015-01-22T23:52:22.074+0200 b.s.d.executor [INFO] Loaded executor
    tasks __acker:[5 5]
    2015-01-22T23:52:22.080+0200 b.s.d.executor [INFO] Timeouts
    disabled for executor __acker:[5 5]
    2015-01-22T23:52:22.081+0200 b.s.d.executor [INFO] Finished
    loading executor __acker:[5 5]
    2015-01-22T23:52:22.087+0200 b.s.d.executor [INFO] Preparing bolt
    __acker:(5)
    2015-01-22T23:52:22.088+0200 b.s.d.executor [INFO] Loading
    executor spout0:[9 9]
    2015-01-22T23:52:22.091+0200 b.s.d.executor [INFO] Prepared bolt
    __acker:(5)
    2015-01-22T23:52:22.104+0200 b.s.m.n.Client [INFO] Reconnect
    started for Netty-Client-bigdata17/192.168.80.214:6700... [2]
    2015-01-22T23:52:22.104+0200 b.s.d.worker [ERROR] Error on
    initialization of server mk-worker
    *java.lang.RuntimeException: java.io.InvalidClassException:
    storm.kafka.KafkaConfig; local class incompatible: stream
    classdesc serialVersionUID = 1806199026298360819, local class
    serialVersionUID = 862253719916491316*
            at
    
backtype.storm.serialization.DefaultSerializationDelegate.deserialize(DefaultSerializationDelegate.java:56)
    ~[storm-core-0.9.3.jar:0.9.3]
            at backtype.storm.utils.Utils.deserialize(Utils.java:89)
    ~[storm-core-0.9.3.jar:0.9.3]
            at
    backtype.storm.utils.Utils.getSetComponentObject(Utils.java:228)
    ~[storm-core-0.9.3.jar:0.9.3]
            at
    backtype.storm.daemon.task$get_task_object.invoke(task.clj:73)
    ~[storm-core-0.9.3.jar:0.9.3]
            at
    backtype.storm.daemon.task$mk_task_data$fn__3131.invoke(task.clj:180)
    ~[storm-core-0.9.3.jar:0.9.3]
            at
    backtype.storm.util$assoc_apply_self.invoke(util.clj:850)
    ~[storm-core-0.9.3.jar:0.9.3]
            at
    backtype.storm.daemon.task$mk_task_data.invoke(task.clj:173)
    ~[storm-core-0.9.3.jar:0.9.3]
            at backtype.storm.daemon.task$mk_task.invoke(task.clj:184)
    ~[storm-core-0.9.3.jar:0.9.3]
            at
    backtype.storm.daemon.executor$mk_executor$fn__3310.invoke(executor.clj:323)
    ~[storm-core-0.9.3.jar:0.9.3]
            at clojure.core$map$fn__4207.invoke(core.clj:2485)
    ~[clojure-1.5.1.jar:na]
            at clojure.lang.LazySeq.sval(LazySeq.java:42)
    ~[clojure-1.5.1.jar:na]
            at clojure.lang.LazySeq.seq(LazySeq.java:60)
    ~[clojure-1.5.1.jar:na]
            at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na]
            at clojure.core$seq.invoke(core.clj:133)
    ~[clojure-1.5.1.jar:na]
            at
    clojure.core.protocols$seq_reduce.invoke(protocols.clj:30)
    ~[clojure-1.5.1.jar:na]
            at
    clojure.core.protocols$fn__6026.invoke(protocols.clj:54)
    ~[clojure-1.5.1.jar:na]
            at
    clojure.core.protocols$fn__5979$G__5974__5992.invoke(protocols.clj:13)
    ~[clojure-1.5.1.jar:na]
            at clojure.core$reduce.invoke(core.clj:6177)
    ~[clojure-1.5.1.jar:na]
            at clojure.core$into.invoke(core.clj:6229)
    ~[clojure-1.5.1.jar:na]
            at
    backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:323) 
~[storm-core-0.9.3.jar:0.9.3]
            at
    
backtype.storm.daemon.worker$fn__3743$exec_fn__1108__auto____3744$iter__3749__3753$fn__3754.invoke(worker.clj:382)
    ~[storm-core-0.9.3.jar:0.9.3]
            at clojure.lang.LazySeq.sval(LazySeq.java:42)
    ~[clojure-1.5.1.jar:na]
            at clojure.lang.LazySeq.seq(LazySeq.java:60)
    ~[clojure-1.5.1.jar:na]
            at clojure.lang.Cons.next(Cons.java:39)
    ~[clojure-1.5.1.jar:na]
            at clojure.lang.LazySeq.next(LazySeq.java:92)
    ~[clojure-1.5.1.jar:na]
            at clojure.lang.RT.next(RT.java:598) ~[clojure-1.5.1.jar:na]
            at clojure.core$next.invoke(core.clj:64)
    ~[clojure-1.5.1.jar:na]
            at clojure.core$dorun.invoke(core.clj:2781)
    ~[clojure-1.5.1.jar:na]
            at clojure.core$doall.invoke(core.clj:2796)
    ~[clojure-1.5.1.jar:na]
            at
    
backtype.storm.daemon.worker$fn__3743$exec_fn__1108__auto____3744.invoke(worker.clj:382)
    ~[storm-core-0.9.3.jar:0.9.3]
            at clojure.lang.AFn.applyToHelper(AFn.java:185)
    [clojure-1.5.1.jar:na]
            at clojure.lang.AFn.applyTo(AFn.java:151)
    [clojure-1.5.1.jar:na]
            at clojure.core$apply.invoke(core.clj:617)
    ~[clojure-1.5.1.jar:na]
            at
    
backtype.storm.daemon.worker$fn__3743$mk_worker__3799.doInvoke(worker.clj:354)
    [storm-core-0.9.3.jar:0.9.3]
            at clojure.lang.RestFn.invoke(RestFn.java:512)
    [clojure-1.5.1.jar:na]
            at
    backtype.storm.daemon.worker$_main.invoke(worker.clj:461)
    [storm-core-0.9.3.jar:0.9.3]
            at clojure.lang.AFn.applyToHelper(AFn.java:172)
    [clojure-1.5.1.jar:na]
            at clojure.lang.AFn.applyTo(AFn.java:151)
    [clojure-1.5.1.jar:na]
            at backtype.storm.daemon.worker.main(Unknown Source)
    [storm-core-0.9.3.jar:0.9.3]

    Margus (margusja) Roo
    http://margus.roo.ee
    skype: margusja
    +372 51 480

    On 22/01/15 20:10, Margus Roo wrote:
    Hi

    2015-01-22T20:06:06.175+0200
    b.s.u.StormBoundedExponentialBackoffRetry [INFO] The
    baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5]
    2015-01-22T20:06:06.175+0200 o.a.s.c.f.i.CuratorFrameworkImpl
    [INFO] Starting
    2015-01-22T20:06:06.175+0200 o.a.s.z.ZooKeeper [INFO] Initiating
    client connection, connectString=localhost:2181/storm
    sessionTimeout=20000
    watcher=org.apache.storm.curator.ConnectionState@5c23f9fd
    2015-01-22T20:06:06.176+0200 o.a.s.z.ClientCnxn [INFO] Opening
    socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will
    not attempt to authenticate using SASL (unknown error)
    2015-01-22T20:06:06.176+0200 o.a.s.z.ClientCnxn [INFO] Socket
    connection established to localhost/0:0:0:0:0:0:0:1:2181,
    initiating session
    2015-01-22T20:06:06.182+0200 o.a.s.z.ClientCnxn [INFO] Session
    establishment complete on server localhost/0:0:0:0:0:0:0:1:2181,
    sessionid = 0x14b016362340b5f, negotiated timeout = 20000
    2015-01-22T20:06:06.182+0200 o.a.s.c.f.s.ConnectionStateManager
    [INFO] State change: CONNECTED
    2015-01-22T20:06:06.201+0200 b.s.d.worker [INFO] Reading Assignments.
    2015-01-22T20:06:06.226+0200 b.s.m.TransportFactory [INFO] Storm
    peer transport plugin:backtype.storm.messaging.netty.Context
    2015-01-22T20:06:06.318+0200 o.a.s.c.r.ExponentialBackoffRetry
    [WARN] maxRetries too large (300). Pinning to 29
    2015-01-22T20:06:06.318+0200
    b.s.u.StormBoundedExponentialBackoffRetry [INFO] The
    baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries [300]
    2015-01-22T20:06:06.318+0200 b.s.m.n.Client [INFO] New Netty
    Client, connect to bigdata17, 6703, config: , buffer_size: 5242880
    2015-01-22T20:06:06.323+0200 b.s.m.n.Client [INFO] Reconnect
    started for Netty-Client-bigdata17/192.168.80.214:6703... [0]
    2015-01-22T20:06:06.323+0200 o.a.s.c.r.ExponentialBackoffRetry
    [WARN] maxRetries too large (300). Pinning to 29
    2015-01-22T20:06:06.323+0200
    b.s.u.StormBoundedExponentialBackoffRetry [INFO] The
    baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries [300]
    2015-01-22T20:06:06.323+0200 b.s.m.n.Client [INFO] New Netty
    Client, connect to bigdata17, 6702, config: , buffer_size: 5242880
    2015-01-22T20:06:06.324+0200 b.s.m.n.Client [INFO] Reconnect
    started for Netty-Client-bigdata17/192.168.80.214:6702... [0]
    2015-01-22T20:06:06.324+0200 o.a.s.c.r.ExponentialBackoffRetry
    [WARN] maxRetries too large (300). Pinning to 29
    2015-01-22T20:06:06.324+0200
    b.s.u.StormBoundedExponentialBackoffRetry [INFO] The
    baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries [300]
    2015-01-22T20:06:06.324+0200 b.s.m.n.Client [INFO] New Netty
    Client, connect to bigdata17, 6701, config: , buffer_size: 5242880
    2015-01-22T20:06:06.324+0200 b.s.m.n.Client [INFO] Reconnect
    started for Netty-Client-bigdata17/192.168.80.214:6701... [0]
    2015-01-22T20:06:06.339+0200 b.s.m.n.Client [INFO] connection
    established to a remote host
    Netty-Client-bigdata17/192.168.80.214:6702
    <http://192.168.80.214:6702>, [id: 0xffc67e3c,
    /192.168.80.214:55264 <http://192.168.80.214:55264> =>
    bigdata17/192.168.80.214:6702 <http://192.168.80.214:6702>]
    2015-01-22T20:06:06.339+0200 b.s.m.n.Client [INFO] connection
    established to a remote host
    Netty-Client-bigdata17/192.168.80.214:6701
    <http://192.168.80.214:6701>, [id: 0x54e2a4e9,
    /192.168.80.214:55748 <http://192.168.80.214:55748> =>
    bigdata17/192.168.80.214:6701 <http://192.168.80.214:6701>]
    2015-01-22T20:06:06.339+0200 b.s.m.n.Client [INFO] connection
    established to a remote host
    Netty-Client-bigdata17/192.168.80.214:6703
    <http://192.168.80.214:6703>, [id: 0xc78c04d0,
    /192.168.80.214:42319 <http://192.168.80.214:42319> =>
    bigdata17/192.168.80.214:6703 <http://192.168.80.214:6703>]
    2015-01-22T20:06:06.503+0200 b.s.d.executor [INFO] Loading
    executor spout:[5 5]
    2015-01-22T20:06:06.517+0200 b.s.d.worker [ERROR] Error on
    initialization of server mk-worker
    java.lang.RuntimeException: java.io.InvalidClassException:
    storm.kafka.KafkaConfig; local class incompatible: stream
    classdesc serialVersionUID = 1806199026298360819, local class
    serialVersionUID = 862253719916491316
            at
    
backtype.storm.serialization.DefaultSerializationDelegate.deserialize(DefaultSerializationDelegate.java:56)
    ~[storm-core-0.9.3.jar:0.9.3]
            at backtype.storm.utils.Utils.deserialize(Utils.java:89)
    ~[storm-core-0.9.3.jar:0.9.3]
            at
    backtype.storm.utils.Utils.getSetComponentObject(Utils.java:228)
    ~[storm-core-0.9.3.jar:0.9.3]
            at
    backtype.storm.daemon.task$get_task_object.invoke(task.clj:73)
    ~[storm-core-0.9.3.jar:0.9.3]
            at
    backtype.storm.daemon.task$mk_task_data$fn__3131.invoke(task.clj:180)
    ~[storm-core-0.9.3.jar:0.9.3]
            at
    backtype.storm.util$assoc_apply_self.invoke(util.clj:850)
    ~[storm-core-0.9.3.jar:0.9.3]
            at
    backtype.storm.daemon.task$mk_task_data.invoke(task.clj:173)
    ~[storm-core-0.9.3.jar:0.9.3]
    604,1         91%
            at
    backtype.storm.util$assoc_apply_self.invoke(util.clj:850)
    ~[storm-core-0.9.3.jar:0.9.3]
            at
    backtype.storm.daemon.task$mk_task_data.invoke(task.clj:173)
    ~[storm-core-0.9.3.jar:0.9.3]
            at
    backtype.storm.daemon.task$mk_task.invoke(task.clj:184)
    ~[storm-core-0.9.3.jar:0.9.3]
            at
    backtype.storm.daemon.executor$mk_executor$fn__3310.invoke(executor.clj:323)
    ~[storm-core-0.9.3.jar:0.9.3]
            at clojure.core$map$fn__4207.invoke(core.clj:2485)
    ~[clojure-1.5.1.jar:na]
            at clojure.lang.LazySeq.sval(LazySeq.java:42)
    ~[clojure-1.5.1.jar:na]
            at clojure.lang.LazySeq.seq(LazySeq.java:60)
    ~[clojure-1.5.1.jar:na]
            at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na]
            at clojure.core$seq.invoke(core.clj:133)
    ~[clojure-1.5.1.jar:na]
            at
    clojure.core.protocols$seq_reduce.invoke(protocols.clj:30)
    ~[clojure-1.5.1.jar:na]
            at
    clojure.core.protocols$fn__6026.invoke(protocols.clj:54)
    ~[clojure-1.5.1.jar:na]
            at
    clojure.core.protocols$fn__5979$G__5974__5992.invoke(protocols.clj:13)
    ~[clojure-1.5.1.jar:na]
            at clojure.core$reduce.invoke(core.clj:6177)
    ~[clojure-1.5.1.jar:na]
            at clojure.core$into.invoke(core.clj:6229)
    ~[clojure-1.5.1.jar:na]
            at
    backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:323)
    ~[storm-core-0.9.3.jar:0.9.3]
            at
    
backtype.storm.daemon.worker$fn__3743$exec_fn__1108__auto____3744$iter__3749__3753$fn__3754.invoke(worker.clj:382)
    ~[storm-core-0.9.3.jar:0.9.3]
            at clojure.lang.LazySeq.sval(LazySeq.java:42)
    ~[clojure-1.5.1.jar:na]
            at clojure.lang.LazySeq.seq(LazySeq.java:60)
    ~[clojure-1.5.1.jar:na]
            at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na]
            at clojure.core$seq.invoke(core.clj:133)
    ~[clojure-1.5.1.jar:na]
            at clojure.core$dorun.invoke(core.clj:2780)
    ~[clojure-1.5.1.jar:na]
            at clojure.core$doall.invoke(core.clj:2796)
    ~[clojure-1.5.1.jar:na]
            at
    
backtype.storm.daemon.worker$fn__3743$exec_fn__1108__auto____3744.invoke(worker.clj:382)
    ~[storm-core-0.9.3.jar:0.9.3]
            at clojure.lang.AFn.applyToHelper(AFn.java:185)
    [clojure-1.5.1.jar:na]
            at clojure.lang.AFn.applyTo(AFn.java:151)
    [clojure-1.5.1.jar:na]
            at clojure.core$apply.invoke(core.clj:617)
    ~[clojure-1.5.1.jar:na]
            at
    
backtype.storm.daemon.worker$fn__3743$mk_worker__3799.doInvoke(worker.clj:354)
    [storm-core-0.9.3.jar:0.9.3]
            at clojure.lang.RestFn.invoke(RestFn.java:512)
    [clojure-1.5.1.jar:na]
            at
    backtype.storm.daemon.worker$_main.invoke(worker.clj:461)
    [storm-core-0.9.3.jar:0.9.3]
            at clojure.lang.AFn.applyToHelper(AFn.java:172)
    [clojure-1.5.1.jar:na]
            at clojure.lang.AFn.applyTo(AFn.java:151)
    [clojure-1.5.1.jar:na]
            at backtype.storm.daemon.worker.main(Unknown Source)
    [storm-core-0.9.3.jar:0.9.3]
    *Caused by: java.io.InvalidClassException:
    storm.kafka.KafkaConfig; local class incompatible: stream
    classdesc serialVersionUID = 1806199026298360819, local class
    serialVersionUID = 862253719916491316*


    I am using storm numbus and supervisor 0.9.3

    build topology using:
        <dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka</artifactId>
            <version>0.9.3</version>
        </dependency>
        <dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
            <version>0.9.3</version>
            <scope>provided</scope>
        </dependency>
            <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2-beta</version>
            </dependency>

    there is no bolt only one spout:

    public class Topology {
          public static void main(String[] args) throws Exception {
                Config conf = new Config();
                conf.setDebug(false);

                TopologyBuilder builder = new TopologyBuilder();

                Broker brokerForPartition0 = new Broker("bigdata14");
                GlobalPartitionInformation partitionInfo = new
    GlobalPartitionInformation();
                partitionInfo.addPartition(0, brokerForPartition0);
                StaticHosts hosts = new StaticHosts(partitionInfo);

                SpoutConfig spoutConfig = new SpoutConfig(hosts,
                        "demo",
                        "/",  // zookeeper root path for offset storing
                        "stormdemo");
                        KafkaSpout kafkaSpout = new
    KafkaSpout(spoutConfig);

                builder.setSpout("spout", kafkaSpout);

                if (args.length > 0) {
                    // if you wish to run your job on a remote cluster
                    conf.setNumWorkers(4);
                    conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
    conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);
                    conf.put(Config.WORKER_CHILDOPTS , "-Xmx4096m");
    conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS , "-Xmx4096m");
    conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING  , 2);

    StormSubmitter.submitTopology(args[0], conf,
    builder.createTopology());
                } else {
                    // if you wish to run and test your job locally
                    conf.setMaxTaskParallelism(1);

                    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("kafkatopology", conf,
    builder.createTopology());
                    Thread.sleep(10000);
                    cluster.shutdown();
                }
            }
    }


    Any ideas?
-- Margus (margusja) Roo
    http://margus.roo.ee
    skype: margusja
    +372 51 480



Reply via email to