I made my code more simple and still the same problem.

code
public class Topology {



      public static void main(String[] args) throws Exception {
           Config conf = new Config();
           conf.setDebug(true);


           TridentTopology topology = new TridentTopology();
           BrokerHosts zk = new ZkHosts("bigdata14:2181");

TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "demo");
           spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);

           Stream spoutStream = topology.newStream("kafka-stream",
                   spout);
           spoutStream.broadcast();


                // if you wish to run your job on a remote cluster
                conf.setNumWorkers(4);
                conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
                conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);
                conf.put(Config.WORKER_CHILDOPTS , "-Xmx4096m");
                conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS , "-Xmx4096m");
                conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING  , 512);

StormSubmitter.submitTopology(args[0], conf, topology.build());

        }
}

and log:

2015-01-22T23:52:20.670+0200 o.a.s.z.ClientCnxn [INFO] Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x14b12e178cd034c, negotiated timeout = 20000 2015-01-22T23:52:20.671+0200 o.a.s.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED 2015-01-22T23:52:20.672+0200 b.s.zookeeper [INFO] Zookeeper state update: :connected:none 2015-01-22T23:52:21.685+0200 o.a.s.z.ZooKeeper [INFO] Session: 0x14b12e178cd034c closed
2015-01-22T23:52:21.685+0200 o.a.s.z.ClientCnxn [INFO] EventThread shut down
2015-01-22T23:52:21.687+0200 b.s.u.StormBoundedExponentialBackoffRetry [INFO] The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5] 2015-01-22T23:52:21.687+0200 o.a.s.c.f.i.CuratorFrameworkImpl [INFO] Starting 2015-01-22T23:52:21.687+0200 o.a.s.z.ZooKeeper [INFO] Initiating client connection, connectString=localhost:2181/storm sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@9ca62e2 2015-01-22T23:52:21.688+0200 o.a.s.z.ClientCnxn [INFO] Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error) 2015-01-22T23:52:21.688+0200 o.a.s.z.ClientCnxn [INFO] Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session 2015-01-22T23:52:21.694+0200 o.a.s.z.ClientCnxn [INFO] Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x14b12e178cd034e, negotiated timeout = 20000 2015-01-22T23:52:21.694+0200 o.a.s.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED
2015-01-22T23:52:21.718+0200 b.s.d.worker [INFO] Reading Assignments.
2015-01-22T23:52:21.746+0200 b.s.m.TransportFactory [INFO] Storm peer transport plugin:backtype.storm.messaging.netty.Context 2015-01-22T23:52:21.872+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN] maxRetries too large (300). Pinning to 29 2015-01-22T23:52:21.873+0200 b.s.u.StormBoundedExponentialBackoffRetry [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries [300] 2015-01-22T23:52:21.873+0200 b.s.m.n.Client [INFO] New Netty Client, connect to bigdata17, 6702, config: , buffer_size: 5242880 2015-01-22T23:52:21.879+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN] maxRetries too large (300). Pinning to 29 2015-01-22T23:52:21.879+0200 b.s.u.StormBoundedExponentialBackoffRetry [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries [300] 2015-01-22T23:52:21.879+0200 b.s.m.n.Client [INFO] New Netty Client, connect to bigdata17, 6701, config: , buffer_size: 5242880 2015-01-22T23:52:21.880+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN] maxRetries too large (300). Pinning to 29 2015-01-22T23:52:21.880+0200 b.s.u.StormBoundedExponentialBackoffRetry [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries [300] 2015-01-22T23:52:21.880+0200 b.s.m.n.Client [INFO] New Netty Client, connect to bigdata17, 6700, config: , buffer_size: 5242880 2015-01-22T23:52:21.882+0200 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-bigdata17/192.168.80.214:6702... [0] 2015-01-22T23:52:21.888+0200 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-bigdata17/192.168.80.214:6701... [0] 2015-01-22T23:52:21.888+0200 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-bigdata17/192.168.80.214:6700... [0] 2015-01-22T23:52:22.001+0200 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-bigdata17/192.168.80.214:6701... [1] 2015-01-22T23:52:22.002+0200 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-bigdata17/192.168.80.214:6700... [1] 2015-01-22T23:52:22.002+0200 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-bigdata17/192.168.80.214:6702... [1] 2015-01-22T23:52:22.068+0200 b.s.d.executor [INFO] Loading executor __acker:[5 5] 2015-01-22T23:52:22.073+0200 b.s.d.task [INFO] Emitting: __acker __system ["startup"] 2015-01-22T23:52:22.074+0200 b.s.d.executor [INFO] Loaded executor tasks __acker:[5 5] 2015-01-22T23:52:22.080+0200 b.s.d.executor [INFO] Timeouts disabled for executor __acker:[5 5] 2015-01-22T23:52:22.081+0200 b.s.d.executor [INFO] Finished loading executor __acker:[5 5] 2015-01-22T23:52:22.087+0200 b.s.d.executor [INFO] Preparing bolt __acker:(5) 2015-01-22T23:52:22.088+0200 b.s.d.executor [INFO] Loading executor spout0:[9 9]
2015-01-22T23:52:22.091+0200 b.s.d.executor [INFO] Prepared bolt __acker:(5)
2015-01-22T23:52:22.104+0200 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-bigdata17/192.168.80.214:6700... [2] 2015-01-22T23:52:22.104+0200 b.s.d.worker [ERROR] Error on initialization of server mk-worker *java.lang.RuntimeException: java.io.InvalidClassException: storm.kafka.KafkaConfig; local class incompatible: stream classdesc serialVersionUID = 1806199026298360819, local class serialVersionUID = 862253719916491316* at backtype.storm.serialization.DefaultSerializationDelegate.deserialize(DefaultSerializationDelegate.java:56) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.utils.Utils.deserialize(Utils.java:89) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.utils.Utils.getSetComponentObject(Utils.java:228) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.daemon.task$get_task_object.invoke(task.clj:73) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.daemon.task$mk_task_data$fn__3131.invoke(task.clj:180) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.util$assoc_apply_self.invoke(util.clj:850) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.daemon.task$mk_task_data.invoke(task.clj:173) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.daemon.task$mk_task.invoke(task.clj:184) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.daemon.executor$mk_executor$fn__3310.invoke(executor.clj:323) ~[storm-core-0.9.3.jar:0.9.3] at clojure.core$map$fn__4207.invoke(core.clj:2485) ~[clojure-1.5.1.jar:na] at clojure.lang.LazySeq.sval(LazySeq.java:42) ~[clojure-1.5.1.jar:na] at clojure.lang.LazySeq.seq(LazySeq.java:60) ~[clojure-1.5.1.jar:na]
        at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na]
        at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na]
at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30) ~[clojure-1.5.1.jar:na] at clojure.core.protocols$fn__6026.invoke(protocols.clj:54) ~[clojure-1.5.1.jar:na] at clojure.core.protocols$fn__5979$G__5974__5992.invoke(protocols.clj:13) ~[clojure-1.5.1.jar:na] at clojure.core$reduce.invoke(core.clj:6177) ~[clojure-1.5.1.jar:na]
        at clojure.core$into.invoke(core.clj:6229) ~[clojure-1.5.1.jar:na]
at backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:323) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.daemon.worker$fn__3743$exec_fn__1108__auto____3744$iter__3749__3753$fn__3754.invoke(worker.clj:382) ~[storm-core-0.9.3.jar:0.9.3] at clojure.lang.LazySeq.sval(LazySeq.java:42) ~[clojure-1.5.1.jar:na] at clojure.lang.LazySeq.seq(LazySeq.java:60) ~[clojure-1.5.1.jar:na]
        at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.5.1.jar:na]
at clojure.lang.LazySeq.next(LazySeq.java:92) ~[clojure-1.5.1.jar:na]
        at clojure.lang.RT.next(RT.java:598) ~[clojure-1.5.1.jar:na]
        at clojure.core$next.invoke(core.clj:64) ~[clojure-1.5.1.jar:na]
        at clojure.core$dorun.invoke(core.clj:2781) ~[clojure-1.5.1.jar:na]
        at clojure.core$doall.invoke(core.clj:2796) ~[clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__3743$exec_fn__1108__auto____3744.invoke(worker.clj:382) ~[storm-core-0.9.3.jar:0.9.3] at clojure.lang.AFn.applyToHelper(AFn.java:185) [clojure-1.5.1.jar:na]
        at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
        at clojure.core$apply.invoke(core.clj:617) ~[clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__3743$mk_worker__3799.doInvoke(worker.clj:354) [storm-core-0.9.3.jar:0.9.3] at clojure.lang.RestFn.invoke(RestFn.java:512) [clojure-1.5.1.jar:na] at backtype.storm.daemon.worker$_main.invoke(worker.clj:461) [storm-core-0.9.3.jar:0.9.3] at clojure.lang.AFn.applyToHelper(AFn.java:172) [clojure-1.5.1.jar:na]
        at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker.main(Unknown Source) [storm-core-0.9.3.jar:0.9.3]

Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
+372 51 480

On 22/01/15 20:10, Margus Roo wrote:
Hi

2015-01-22T20:06:06.175+0200 b.s.u.StormBoundedExponentialBackoffRetry [INFO] The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5] 2015-01-22T20:06:06.175+0200 o.a.s.c.f.i.CuratorFrameworkImpl [INFO] Starting 2015-01-22T20:06:06.175+0200 o.a.s.z.ZooKeeper [INFO] Initiating client connection, connectString=localhost:2181/storm sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@5c23f9fd 2015-01-22T20:06:06.176+0200 o.a.s.z.ClientCnxn [INFO] Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error) 2015-01-22T20:06:06.176+0200 o.a.s.z.ClientCnxn [INFO] Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session 2015-01-22T20:06:06.182+0200 o.a.s.z.ClientCnxn [INFO] Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x14b016362340b5f, negotiated timeout = 20000 2015-01-22T20:06:06.182+0200 o.a.s.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED
2015-01-22T20:06:06.201+0200 b.s.d.worker [INFO] Reading Assignments.
2015-01-22T20:06:06.226+0200 b.s.m.TransportFactory [INFO] Storm peer transport plugin:backtype.storm.messaging.netty.Context 2015-01-22T20:06:06.318+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN] maxRetries too large (300). Pinning to 29 2015-01-22T20:06:06.318+0200 b.s.u.StormBoundedExponentialBackoffRetry [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries [300] 2015-01-22T20:06:06.318+0200 b.s.m.n.Client [INFO] New Netty Client, connect to bigdata17, 6703, config: , buffer_size: 5242880 2015-01-22T20:06:06.323+0200 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-bigdata17/192.168.80.214:6703... [0] 2015-01-22T20:06:06.323+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN] maxRetries too large (300). Pinning to 29 2015-01-22T20:06:06.323+0200 b.s.u.StormBoundedExponentialBackoffRetry [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries [300] 2015-01-22T20:06:06.323+0200 b.s.m.n.Client [INFO] New Netty Client, connect to bigdata17, 6702, config: , buffer_size: 5242880 2015-01-22T20:06:06.324+0200 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-bigdata17/192.168.80.214:6702... [0] 2015-01-22T20:06:06.324+0200 o.a.s.c.r.ExponentialBackoffRetry [WARN] maxRetries too large (300). Pinning to 29 2015-01-22T20:06:06.324+0200 b.s.u.StormBoundedExponentialBackoffRetry [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries [300] 2015-01-22T20:06:06.324+0200 b.s.m.n.Client [INFO] New Netty Client, connect to bigdata17, 6701, config: , buffer_size: 5242880 2015-01-22T20:06:06.324+0200 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-bigdata17/192.168.80.214:6701... [0] 2015-01-22T20:06:06.339+0200 b.s.m.n.Client [INFO] connection established to a remote host Netty-Client-bigdata17/192.168.80.214:6702, [id: 0xffc67e3c, /192.168.80.214:55264 => bigdata17/192.168.80.214:6702] 2015-01-22T20:06:06.339+0200 b.s.m.n.Client [INFO] connection established to a remote host Netty-Client-bigdata17/192.168.80.214:6701, [id: 0x54e2a4e9, /192.168.80.214:55748 => bigdata17/192.168.80.214:6701] 2015-01-22T20:06:06.339+0200 b.s.m.n.Client [INFO] connection established to a remote host Netty-Client-bigdata17/192.168.80.214:6703, [id: 0xc78c04d0, /192.168.80.214:42319 => bigdata17/192.168.80.214:6703] 2015-01-22T20:06:06.503+0200 b.s.d.executor [INFO] Loading executor spout:[5 5] 2015-01-22T20:06:06.517+0200 b.s.d.worker [ERROR] Error on initialization of server mk-worker java.lang.RuntimeException: java.io.InvalidClassException: storm.kafka.KafkaConfig; local class incompatible: stream classdesc serialVersionUID = 1806199026298360819, local class serialVersionUID = 862253719916491316 at backtype.storm.serialization.DefaultSerializationDelegate.deserialize(DefaultSerializationDelegate.java:56) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.utils.Utils.deserialize(Utils.java:89) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.utils.Utils.getSetComponentObject(Utils.java:228) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.daemon.task$get_task_object.invoke(task.clj:73) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.daemon.task$mk_task_data$fn__3131.invoke(task.clj:180) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.util$assoc_apply_self.invoke(util.clj:850) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.daemon.task$mk_task_data.invoke(task.clj:173) ~[storm-core-0.9.3.jar:0.9.3]
604,1         91%
at backtype.storm.util$assoc_apply_self.invoke(util.clj:850) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.daemon.task$mk_task_data.invoke(task.clj:173) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.daemon.task$mk_task.invoke(task.clj:184) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.daemon.executor$mk_executor$fn__3310.invoke(executor.clj:323) ~[storm-core-0.9.3.jar:0.9.3] at clojure.core$map$fn__4207.invoke(core.clj:2485) ~[clojure-1.5.1.jar:na] at clojure.lang.LazySeq.sval(LazySeq.java:42) ~[clojure-1.5.1.jar:na] at clojure.lang.LazySeq.seq(LazySeq.java:60) ~[clojure-1.5.1.jar:na]
        at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na]
        at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na]
at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30) ~[clojure-1.5.1.jar:na] at clojure.core.protocols$fn__6026.invoke(protocols.clj:54) ~[clojure-1.5.1.jar:na] at clojure.core.protocols$fn__5979$G__5974__5992.invoke(protocols.clj:13) ~[clojure-1.5.1.jar:na] at clojure.core$reduce.invoke(core.clj:6177) ~[clojure-1.5.1.jar:na]
        at clojure.core$into.invoke(core.clj:6229) ~[clojure-1.5.1.jar:na]
at backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:323) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.daemon.worker$fn__3743$exec_fn__1108__auto____3744$iter__3749__3753$fn__3754.invoke(worker.clj:382) ~[storm-core-0.9.3.jar:0.9.3] at clojure.lang.LazySeq.sval(LazySeq.java:42) ~[clojure-1.5.1.jar:na] at clojure.lang.LazySeq.seq(LazySeq.java:60) ~[clojure-1.5.1.jar:na]
        at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na]
        at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na]
at clojure.core$dorun.invoke(core.clj:2780) ~[clojure-1.5.1.jar:na] at clojure.core$doall.invoke(core.clj:2796) ~[clojure-1.5.1.jar:na] at backtype.storm.daemon.worker$fn__3743$exec_fn__1108__auto____3744.invoke(worker.clj:382) ~[storm-core-0.9.3.jar:0.9.3] at clojure.lang.AFn.applyToHelper(AFn.java:185) [clojure-1.5.1.jar:na]
        at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
        at clojure.core$apply.invoke(core.clj:617) ~[clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__3743$mk_worker__3799.doInvoke(worker.clj:354) [storm-core-0.9.3.jar:0.9.3] at clojure.lang.RestFn.invoke(RestFn.java:512) [clojure-1.5.1.jar:na] at backtype.storm.daemon.worker$_main.invoke(worker.clj:461) [storm-core-0.9.3.jar:0.9.3] at clojure.lang.AFn.applyToHelper(AFn.java:172) [clojure-1.5.1.jar:na]
        at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker.main(Unknown Source) [storm-core-0.9.3.jar:0.9.3] *Caused by: java.io.InvalidClassException: storm.kafka.KafkaConfig; local class incompatible: stream classdesc serialVersionUID = 1806199026298360819, local class serialVersionUID = 862253719916491316*


I am using storm numbus and supervisor 0.9.3

build topology using:
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-kafka</artifactId>
        <version>0.9.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>0.9.3</version>
        <scope>provided</scope>
    </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2-beta</version>
        </dependency>

there is no bolt only one spout:

public class Topology {
      public static void main(String[] args) throws Exception {
            Config conf = new Config();
            conf.setDebug(false);

            TopologyBuilder builder = new TopologyBuilder();

            Broker brokerForPartition0 = new Broker("bigdata14");
GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
            partitionInfo.addPartition(0, brokerForPartition0);
            StaticHosts hosts = new StaticHosts(partitionInfo);

            SpoutConfig spoutConfig = new SpoutConfig(hosts,
                    "demo",
                    "/",  // zookeeper root path for offset storing
                    "stormdemo");
                    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

            builder.setSpout("spout", kafkaSpout);

            if (args.length > 0) {
                // if you wish to run your job on a remote cluster
                conf.setNumWorkers(4);
                conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
                conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);
                conf.put(Config.WORKER_CHILDOPTS , "-Xmx4096m");
                conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS , "-Xmx4096m");
                conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING  , 2);

StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } else {
                // if you wish to run and test your job locally
                conf.setMaxTaskParallelism(1);

                LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafkatopology", conf, builder.createTopology());
                Thread.sleep(10000);
                cluster.shutdown();
            }
        }
}


Any ideas?
--
Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
+372 51 480

Reply via email to