I was able to verify this to be a bug in how worker hooks work in local mode.
In trying to see if this affects distributed mode as well, a found a more serious issue that prevents workers from shutting down gracefully (an thus preventing shutdown hooks from running): https://issues.apache.org/jira/browse/STORM-2176 <https://issues.apache.org/jira/browse/STORM-2176> So for the time being I don’t believe worker shutdown hooks work in either local or distributed mode. I can confirm the start portion of worker hooks functions properly, but not shutdown. Hopefully we will be able to fix both these issues in an upcoming release. -Taylor > On Oct 21, 2016, at 9:58 AM, Kevin Peek <kp...@salesforce.com> wrote: > > I am running into problems with WorkerHooks on a local cluster. Even using > only a BaseWorkerHook, I get an Exception. When I run the following code, an > EOFException is thrown - it seems the Worker is trying to deserialize an > empty byte[] for one of the WorkerHooks. Comment out the line adding the hook > and this runs fine. > > Can someone help me understand what is going wrong here and whether or not > this is strictly an issue with the LocalCluster and how I am using it. > > > TopologyBuilder builder = new TopologyBuilder(); > builder.setSpout("spoutId", new RandomNumberSpout()); > builder.addWorkerHook(new BaseWorkerHook()); > StormTopology topology = builder.createTopology(); > Config config = new Config(); > config.setMessageTimeoutSecs(1); > String topologyName = "dummy-topology"; > > LocalCluster cluster = new LocalCluster(); > cluster.submitTopology(topologyName, config, topology); > Thread.sleep(5000); > cluster.killTopology(topologyName); > Thread.sleep(10000); > cluster.shutdown(); > > > Produces: > > > java.lang.RuntimeException: java.io.EOFException > > at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:185) > at > org.apache.storm.daemon.worker$run_worker_shutdown_hooks$iter__8540__8544$fn__8545.invoke(worker.clj:576) > at clojure.lang.LazySeq.sval(LazySeq.java:40) > at clojure.lang.LazySeq.seq(LazySeq.java:49) > at clojure.lang.RT.seq(RT.java:507) > at clojure.core$seq__4128.invoke(core.clj:137) > at clojure.core$dorun.invoke(core.clj:3009) > at clojure.core$doall.invoke(core.clj:3025) > at > org.apache.storm.daemon.worker$run_worker_shutdown_hooks.invoke(worker.clj:574) > at > org.apache.storm.daemon.worker$fn__8555$exec_fn__2466__auto__$reify__8557$shutdown_STAR___8577.invoke(worker.clj:691) > at > org.apache.storm.daemon.worker$fn__8555$exec_fn__2466__auto__$reify$reify__8603.shutdown(worker.clj:704) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) > at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) > at > org.apache.storm.process_simulator$kill_process.invoke(process_simulator.clj:46) > at > org.apache.storm.daemon.supervisor$shutdown_worker.invoke(supervisor.clj:286) > at > org.apache.storm.daemon.supervisor$fn__9307$exec_fn__2466__auto__$reify__9332.shutdown_all_workers(supervisor.clj:852) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) > at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) > at > org.apache.storm.testing$kill_local_storm_cluster.invoke(testing.clj:199) > at org.apache.storm.LocalCluster$_shutdown.invoke(LocalCluster.clj:66) > at org.apache.storm.LocalCluster.shutdown(Unknown Source)
signature.asc
Description: Message signed with OpenPGP using GPGMail