Hi All,
I'm having trouble getting my trident topology to reliably process when using the OpaqueTridentKafkaSpout.

Local Mode
In local mode, I experience behavior similar to [1] - the topology runs through 1-8 batches, and then slows to a stop, ultimately getting messages starting with:

633053 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] WARN org.apache.zookeeper.server.NIOServerCnxn - EndOfStreamException: Unable to read additional data from client sessionid 0x145dcad612b001b, likely client has closed socket

then:

641356 [Thread-38-spout0-EventThread] INFO com.netflix.curator.framework.state.ConnectionStateManager - State change: SUSPENDED 641358 [ConnectionStateManager-0] WARN com.netflix.curator.framework.state.ConnectionStateManager - There are no ConnectionStateListeners registered.

And lastly:

675253 [main-EventThread] WARN backtype.storm.cluster - Received event :disconnected::none: with disconnected Zookeeper. 677965 [main-EventThread] INFO com.netflix.curator.framework.state.ConnectionStateManager - State change: LOST 677965 [main-EventThread] WARN backtype.storm.cluster - Received event :expired::none: with disconnected Zookeeper. 684088 [SyncThread:0] ERROR org.apache.zookeeper.server.NIOServerCnxn - Unexpected Exception:
java.nio.channels.CancelledKeyException: null
at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73) ~[na:1.7.0_07] at sun.nio.ch.SelectionKeyImpl.interestOps(SelectionKeyImpl.java:77) ~[na:1.7.0_07] at org.apache.zookeeper.server.NIOServerCnxn.sendBuffer(NIOServerCnxn.java:418) [zookeeper-3.3.3.jar:3.3.3-1073969] at org.apache.zookeeper.server.NIOServerCnxn.sendResponse(NIOServerCnxn.java:1509) [zookeeper-3.3.3.jar:3.3.3-1073969] at org.apache.zookeeper.server.FinalRequestProcessor.processRequest(FinalRequestProcessor.java:171) [zookeeper-3.3.3.jar:3.3.3-1073969] at org.apache.zookeeper.server.SyncRequestProcessor.run(SyncRequestProcessor.java:135) [zookeeper-3.3.3.jar:3.3.3-1073969]

918289 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] WARN org.apache.zookeeper.server.NIOServerCnxn - EndOfStreamException: Unable to read additional data from client sessionid 0x0, likely client has closed socket

(I can put more in a gist if necessary)
============================

It appears that the spout loses its connection to zookeeper for maintaining the transactional state, but I am confused how that happens when, in localmode, storm uses the in-process zookeeper.

I have tried connecting to both Kafka 0.7 and 0.8 instances (using storm-kafka and the Wurstmeister jars, respectively), and both end the same way. Additionally, I've replaced the kafka spout with a FixedBatchSpout, and that runs just fine (which is why I think this has something to do with my kafka spout configuration).

As for my kafka spout config, it's in Jruby, but it's a very basic config with 1 static broker host. The only thing that's non standard is that I have a custom scheme (written in Java) that parses the Kafka message (which is JSON) and exposes the fields as named Values in the tuple (so that I can do groupBy immediately after a tuple comes off of the spout). But using the same Scheme, the FixedBatch spout works fine.

Cluster Mode

In cluster mode, I don't have the same crashes, but I do see in the log where a worker reconnects to zookeeper and relaunches the worker. The following occurs about 10 minutes into the deployment:

2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client environment:zookeeper.version=3.3.3-1073969, built on 02/23/2011 22:27 GMT 2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client environment:host.name=stormsingle1 2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client environment:java.version=1.7.0_55 2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client environment:java.vendor=Oracle Corporation 2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client environment:java.home=/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.55.x86_64/jre 2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client environment:java.class.path=/opt/storm/lib/junit-3.8.1.jar:/opt/storm/lib/minlog-1.2.jar:/opt/storm/lib/tools.macro-0.1.0.jar:/opt/storm/lib/servlet-api-2.5-20081211.jar:/opt/storm/lib/carbonite-1.3.2.jar:/opt/storm/lib/compojure-1.1.3.jar:/opt/storm/lib/clojure-1.4.0.jar:/opt/storm/lib/kryo-2.17.jar:/opt/storm/lib/tools.logging-0.2.3.jar:/opt/storm/lib/slf4j-api-1.6.5.jar:/opt/storm/lib/log4j-over-slf4j-1.6.6.jar:/opt/storm/lib/commons-logging-1.1.1.jar:/opt/storm/lib/clj-stacktrace-0.2.4.jar:/opt/storm/lib/httpcore-4.1.jar:/opt/storm/lib/hiccup-0.3.6.jar:/opt/storm/lib/netty-3.6.3.Final.jar:/opt/storm/lib/jgrapht-core-0.9.0.jar:/opt/storm/lib/commons-exec-1.1.jar:/opt/storm/lib/servlet-api-2.5.jar:/opt/storm/lib/ring-servlet-0.3.11.jar:/opt/storm/lib/clj-time-0.4.1.jar:/opt/storm/lib/objenesis-1.2.jar:/opt/storm/lib/jline-2.11.jar:/opt/storm/lib/meat-locker-0.3.1.jar:/opt/storm/lib/logback-classic-1.0.6.jar:/opt/storm/lib/json-simple-1.1.jar:/opt/storm/lib/curator-framework-1.0.1.jar:/opt/storm/lib/commons-io-1.4.jar:/opt/storm/lib/curator-client-1.0.1.jar:/opt/storm/lib/guava-13.0.jar:/opt/storm/lib/storm-core-0.9.1-incubating.jar:/opt/storm/lib/commons-fileupload-1.2.1.jar:/opt/storm/lib/logback-core-1.0.6.jar:/opt/storm/lib/math.numeric-tower-0.0.1.jar:/opt/storm/lib/snakeyaml-1.11.jar:/opt/storm/lib/tools.cli-0.2.2.jar:/opt/storm/lib/core.incubator-0.1.0.jar:/opt/storm/lib/zookeeper-3.3.3.jar:/opt/storm/lib/httpclient-4.1.1.jar:/opt/storm/lib/clout-1.0.1.jar:/opt/storm/lib/jetty-6.1.26.jar:/opt/storm/lib/ring-devel-0.3.11.jar:/opt/storm/lib/asm-4.0.jar:/opt/storm/lib/ring-jetty-adapter-0.3.11.jar:/opt/storm/lib/commons-codec-1.4.jar:/opt/storm/lib/ring-core-1.1.5.jar:/opt/storm/lib/joda-time-2.0.jar:/opt/storm/lib/commons-lang-2.5.jar:/opt/storm/lib/reflectasm-1.07-shaded.jar:/opt/storm/lib/jetty-util-6.1.26.jar:/opt/storm/lib/disruptor-2.10.1.jar:/opt/storm/conf:/app/storm/supervisor/stormdist/gf_topology-2-1399643628/stormjar.jar 2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client environment:java.library.path=/usr/local/lib:/opt/local/lib:/usr/lib 2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client environment:java.io.tmpdir=/tmp 2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client environment:java.compiler=<NA>
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client environment:os.name=Linux
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client environment:os.arch=amd64
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client environment:os.version=2.6.32-431.el6.x86_64 2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client environment:user.name=storm 2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client environment:user.home=/home/storm
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client environment:user.dir=/
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server environment:zookeeper.version=3.3.3-1073969, built on 02/23/2011 22:27 GMT 2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server environment:host.name=stormsingle1 2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server environment:java.version=1.7.0_55 2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server environment:java.vendor=Oracle Corporation 2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server environment:java.home=/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.55.x86_64/jre 2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server environment:java.class.path=/opt/storm/lib/junit-3.8.1.jar:/opt/storm/lib/minlog-1.2.jar:/opt/storm/lib/tools.macro-0.1.0.jar:/opt/storm/lib/servlet-api-2.5-20081211.jar:/opt/storm/lib/carbonite-1.3.2.jar:/opt/storm/lib/compojure-1.1.3.jar:/opt/storm/lib/clojure-1.4.0.jar:/opt/storm/lib/kryo-2.17.jar:/opt/storm/lib/tools.logging-0.2.3.jar:/opt/storm/lib/slf4j-api-1.6.5.jar:/opt/storm/lib/log4j-over-slf4j-1.6.6.jar:/opt/storm/lib/commons-logging-1.1.1.jar:/opt/storm/lib/clj-stacktrace-0.2.4.jar:/opt/storm/lib/httpcore-4.1.jar:/opt/storm/lib/hiccup-0.3.6.jar:/opt/storm/lib/netty-3.6.3.Final.jar:/opt/storm/lib/jgrapht-core-0.9.0.jar:/opt/storm/lib/commons-exec-1.1.jar:/opt/storm/lib/servlet-api-2.5.jar:/opt/storm/lib/ring-servlet-0.3.11.jar:/opt/storm/lib/clj-time-0.4.1.jar:/opt/storm/lib/objenesis-1.2.jar:/opt/storm/lib/jline-2.11.jar:/opt/storm/lib/meat-locker-0.3.1.jar:/opt/storm/lib/logback-classic-1.0.6.jar:/opt/storm/lib/json-simple-1.1.jar:/opt/storm/lib/curator-framework-1.0.1.jar:/opt/storm/lib/commons-io-1.4.jar:/opt/storm/lib/curator-client-1.0.1.jar:/opt/storm/lib/guava-13.0.jar:/opt/storm/lib/storm-core-0.9.1-incubating.jar:/opt/storm/lib/commons-fileupload-1.2.1.jar:/opt/storm/lib/logback-core-1.0.6.jar:/opt/storm/lib/math.numeric-tower-0.0.1.jar:/opt/storm/lib/snakeyaml-1.11.jar:/opt/storm/lib/tools.cli-0.2.2.jar:/opt/storm/lib/core.incubator-0.1.0.jar:/opt/storm/lib/zookeeper-3.3.3.jar:/opt/storm/lib/httpclient-4.1.1.jar:/opt/storm/lib/clout-1.0.1.jar:/opt/storm/lib/jetty-6.1.26.jar:/opt/storm/lib/ring-devel-0.3.11.jar:/opt/storm/lib/asm-4.0.jar:/opt/storm/lib/ring-jetty-adapter-0.3.11.jar:/opt/storm/lib/commons-codec-1.4.jar:/opt/storm/lib/ring-core-1.1.5.jar:/opt/storm/lib/joda-time-2.0.jar:/opt/storm/lib/commons-lang-2.5.jar:/opt/storm/lib/reflectasm-1.07-shaded.jar:/opt/storm/lib/jetty-util-6.1.26.jar:/opt/storm/lib/disruptor-2.10.1.jar:/opt/storm/conf:/app/storm/supervisor/stormdist/gf_topology-2-1399643628/stormjar.jar 2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server environment:java.library.path=/usr/local/lib:/opt/local/lib:/usr/lib 2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server environment:java.io.tmpdir=/tmp 2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server environment:java.compiler=<NA> 2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server environment:os.name=Linux 2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server environment:os.arch=amd64 2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server environment:os.version=2.6.32-431.el6.x86_64 2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server environment:user.name=storm 2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server environment:user.home=/home/storm 2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server environment:user.dir=/ 2014-05-09 14:15:53 b.s.d.worker [INFO] Launching worker for gf_topology-2-1399643628 on fca61061-ffad-41a2-8f93-8c87cbbe358b:6701 with id db592f15-cb7d-4fcd-8dff-9e2b4df3e973 and conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 5000, "topology.skip.missing.kryo.registrations" false, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx256m -Djava.net.preferIPv4Stack=true", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 500, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m -Djava.net.preferIPv4Stack=true", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/app/storm", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "stormsingle1", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2181, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["stormsingle1"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx256m -Djava.net.preferIPv4Stack=true", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" [6700 6701], "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m -Djava.net.preferIPv4Stack=true", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.hostname" "stormsingle1", "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx256m -Djava.net.preferIPv4Stack=true", "storm.cluster.mode" "distributed", "topology.optimize" true, "topology.max.task.parallelism" nil}
2014-05-09 14:15:53 c.n.c.f.i.CuratorFrameworkImpl [INFO] Starting
2014-05-09 14:15:53 o.a.z.ZooKeeper [INFO] Initiating client connection, connectString=stormsingle1:2181 sessionTimeout=20000 watcher=com.netflix.curator.ConnectionState@418d276b 2014-05-09 14:15:53 o.a.z.ClientCnxn [INFO] Opening socket connection to server stormsingle1/127.0.0.1:2181 2014-05-09 14:15:54 o.a.z.ClientCnxn [INFO] Socket connection established to stormsingle1/127.0.0.1:2181, initiating session 2014-05-09 14:15:54 o.a.z.ClientCnxn [INFO] Session establishment complete on server stormsingle1/127.0.0.1:2181, sessionid = 0x145dd2a3780004c, negotiated timeout = 20000 2014-05-09 14:15:54 b.s.zookeeper [INFO] Zookeeper state update: :connected:none
2014-05-09 14:15:54 o.a.z.ClientCnxn [INFO] EventThread shut down
2014-05-09 14:15:54 o.a.z.ZooKeeper [INFO] Session: 0x145dd2a3780004c closed
2014-05-09 14:15:54 c.n.c.f.i.CuratorFrameworkImpl [INFO] Starting
2014-05-09 14:15:54 o.a.z.ZooKeeper [INFO] Initiating client connection, connectString=stormsingle1:2181/storm sessionTimeout=20000 watcher=com.netflix.curator.ConnectionState@36189ddf 2014-05-09 14:15:54 o.a.z.ClientCnxn [INFO] Opening socket connection to server stormsingle1/127.0.0.1:2181 2014-05-09 14:15:54 o.a.z.ClientCnxn [INFO] Socket connection established to stormsingle1/127.0.0.1:2181, initiating session 2014-05-09 14:15:54 o.a.z.ClientCnxn [INFO] Session establishment complete on server stormsingle1/127.0.0.1:2181, sessionid = 0x145dd2a3780004d, negotiated timeout = 20000

===================

I'm just running 2 workers, and the other one will continue to run the same batch over and over. Eg, here's a debug line:

2014-05-09 15:23:26 b.s.d.task [INFO] Emitting: spout0 s1 [2:67, **FIELD1**,**FIELD2**,**FIELD3**,**FIELD4**]

Which I am assuming translates to the 67th attempt at batch 2 (or txid 2)

Questions

1. Why does my connection to zookeeper seem so tenuous? I've tried
   in-process (in local mode), single node cluster vm (where
   supervisors, workers, nimbus, and ZK live on the same VM), multi
   node VMs (where nimbus and supervisors are on separate VMs on the
   same host), in both a ZK VM configuration, and using a remote ZK
   cluster config. I have ensured that /etc/hosts all point to all the
   right things, which I know can be problematic.
2. I have a strong suspicion that my batches are taking too long to
   complete, and thus the spout replays the same ginormous batch over
   and over again. How can I tell the kafka spout to take, say, 100
   msgs per batch? Or am I not understanding batching in Storm.

Thanks all.
Dave K


[1] https://groups.google.com/d/msg/storm-user/9qTq-6P9bys/xuCG3pz0yPUJ

Reply via email to