I have a 4 node cluster (1 main, 3 supervisors) that is behaving strangely.  
When node1 and node2 are running (1 supervisor each, 1 worker each), I can 
successfully deploy and run my topology.  When I introduce node3 to the mix, 
the worker on node 3 dies as soon as it finishes loading the topology, while 
worker1 and worker2 sit and wait for worker3.  This is the error:

2014-09-17 16:07:01 b.s.m.n.Client [INFO] connection established to a remote 
host Netty-Client-myhost.com/158.171.43.101:6703, [id: 0x6b960dca, 
/158.171.43.207:49389 => myhost.com/158.171.43.101:6703]
2014-09-17 16:07:01 b.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.lang.ClassCastException: 
clojure.lang.PersistentArrayMap cannot be cast to 
storm.trident.topology.TransactionAttempt
        at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
 ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at 
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
 ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at 
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) 
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at 
backtype.storm.daemon.executor$fn__5641$fn__5653$fn__5700.invoke(executor.clj:746)
 ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) 
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]

If I run multiple workers on node1, or multiple workers on node2, or multiple 
on both, the topology loads and runs successfully.  If I run just node3 by 
itself (regardless of the number of workers), the topology loads and runs 
successfully.  However, any combination of node3 with any other node fails on 
node3 with the given error, literally the second the topology hits  node3.

I have trimmed the topology down to be the most basic scaffolding thinking that 
perhaps there was a serialization issue (though I still didn't know why it 
would work on the other 2 nodes).  All supervisors start successfully with no 
errors, likewise nimbus and zookeeper all show no errors.

Dummy topology:

def buildTopology() = {
      val topology = new TridentTopology()
topology.newStream("ingestorSpout", new DummySpout)
topology.build()
    }

Dummy spout:

class DummySpout extends IBatchSpout {
  import scala.collection.JavaConverters._

  override def open(conf: java.util.Map[_, _], context: TopologyContext): Unit 
= {
println("opened dummy spout")
  }

  override def getOutputFields(): Fields = {
    println("dummy output fields")
    List("dummystring")

  }

  override def emitBatch(batchId: Long, collector: TridentCollector) {
    collector.emit(List("hello dummy world"))

  }

I have scoured the internet for this error and all variations, found 1 post 
with the exact error, but no resolution.  Hoping the community can help.

Thanks


----------------------------------------------------------------------
This message, and any attachments, is for the intended recipient(s) only, may 
contain information that is privileged, confidential and/or proprietary and 
subject to important terms and conditions available at 
http://www.bankofamerica.com/emaildisclaimer.   If you are not the intended 
recipient, please delete this message.

Reply via email to