I was able to verify this to be a bug in how worker hooks work in local mode.

In trying to see if this affects distributed mode as well, a found a more 
serious issue that prevents workers from shutting down gracefully (an thus 
preventing shutdown hooks from running):

https://issues.apache.org/jira/browse/STORM-2176 
<https://issues.apache.org/jira/browse/STORM-2176>

So for the time being I don’t believe worker shutdown hooks work in either 
local or distributed mode. I can confirm the start portion of worker hooks 
functions properly, but not shutdown. Hopefully we will be able to fix both 
these issues in an upcoming release.

-Taylor


> On Oct 21, 2016, at 9:58 AM, Kevin Peek <kp...@salesforce.com> wrote:
> 
> I am running into problems with WorkerHooks on a local cluster. Even using 
> only a BaseWorkerHook, I get an Exception. When I run the following code, an 
> EOFException is thrown - it seems the Worker is trying to deserialize an 
> empty byte[] for one of the WorkerHooks. Comment out the line adding the hook 
> and this runs fine.
> 
> Can someone help me understand what is going wrong here and whether or not 
> this is strictly an issue with the LocalCluster and how I am using it.
> 
> 
> TopologyBuilder builder = new TopologyBuilder();
> builder.setSpout("spoutId", new RandomNumberSpout());
> builder.addWorkerHook(new BaseWorkerHook());
> StormTopology topology = builder.createTopology();
> Config config = new Config();
> config.setMessageTimeoutSecs(1);
> String topologyName = "dummy-topology";
> 
> LocalCluster cluster = new LocalCluster();
> cluster.submitTopology(topologyName, config, topology);
> Thread.sleep(5000);
> cluster.killTopology(topologyName);
> Thread.sleep(10000);
> cluster.shutdown();
> 
> 
> Produces:
> 
> 
> java.lang.RuntimeException: java.io.EOFException
> 
>       at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:185)
>       at 
> org.apache.storm.daemon.worker$run_worker_shutdown_hooks$iter__8540__8544$fn__8545.invoke(worker.clj:576)
>       at clojure.lang.LazySeq.sval(LazySeq.java:40)
>       at clojure.lang.LazySeq.seq(LazySeq.java:49)
>       at clojure.lang.RT.seq(RT.java:507)
>       at clojure.core$seq__4128.invoke(core.clj:137)
>       at clojure.core$dorun.invoke(core.clj:3009)
>       at clojure.core$doall.invoke(core.clj:3025)
>       at 
> org.apache.storm.daemon.worker$run_worker_shutdown_hooks.invoke(worker.clj:574)
>       at 
> org.apache.storm.daemon.worker$fn__8555$exec_fn__2466__auto__$reify__8557$shutdown_STAR___8577.invoke(worker.clj:691)
>       at 
> org.apache.storm.daemon.worker$fn__8555$exec_fn__2466__auto__$reify$reify__8603.shutdown(worker.clj:704)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
>       at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
>       at 
> org.apache.storm.process_simulator$kill_process.invoke(process_simulator.clj:46)
>       at 
> org.apache.storm.daemon.supervisor$shutdown_worker.invoke(supervisor.clj:286)
>       at 
> org.apache.storm.daemon.supervisor$fn__9307$exec_fn__2466__auto__$reify__9332.shutdown_all_workers(supervisor.clj:852)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
>       at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
>       at 
> org.apache.storm.testing$kill_local_storm_cluster.invoke(testing.clj:199)
>       at org.apache.storm.LocalCluster$_shutdown.invoke(LocalCluster.clj:66)
>       at org.apache.storm.LocalCluster.shutdown(Unknown Source)

Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to