Repository: storm
Updated Branches:
  refs/heads/master 26818f2b4 -> 8120e15ef


fix STORM-2979

Add in WorkerState an internal list of workerHooks and use it to
start/stop

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ce7c716f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ce7c716f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ce7c716f

Branch: refs/heads/master
Commit: ce7c716fc55c9af792998c2b3d512018cdbd8bec
Parents: ffa607e
Author: michelo <michelo@michelo-desktop>
Authored: Sat Apr 7 18:13:12 2018 +0200
Committer: michelo <michelo@michelo-desktop>
Committed: Sat Apr 7 18:13:12 2018 +0200

----------------------------------------------------------------------
 .../apache/storm/daemon/worker/WorkerState.java | 28 +++++++++++++-------
 1 file changed, 19 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ce7c716f/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java 
b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index 9c7cf9e..b763c8d 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -63,7 +63,7 @@ import org.apache.storm.generated.StreamInfo;
 import org.apache.storm.generated.TopologyStatus;
 import org.apache.storm.grouping.Load;
 import org.apache.storm.grouping.LoadMapping;
-import org.apache.storm.hooks.BaseWorkerHook;
+import org.apache.storm.hooks.IWorkerHook;
 import org.apache.storm.messaging.ConnectionWithStatus;
 import org.apache.storm.messaging.DeserializingConnectionCallback;
 import org.apache.storm.messaging.IConnection;
@@ -95,6 +95,11 @@ public class WorkerState {
     final IContext mqContext;
     private final WorkerTransfer workerTransfer;
     private final BackPressureTracker bpTracker;
+    private final List<IWorkerHook> deserializedWorkerHooks;
+
+    public List<IWorkerHook> getDeserializedWorkerHooks() {
+        return deserializedWorkerHooks;
+    }
 
     public Map getConf() {
         return conf;
@@ -339,6 +344,15 @@ public class WorkerState {
         int maxTaskId = getMaxTaskId(componentToSortedTasks);
         this.workerTransfer = new WorkerTransfer(this, topologyConf, 
maxTaskId);
         this.bpTracker = new BackPressureTracker(workerId, localTaskIds);
+        // 
+        this.deserializedWorkerHooks = new ArrayList<>();
+        if (topology.is_set_worker_hooks()) {
+            for (ByteBuffer hook : topology.get_worker_hooks()) {
+                byte[] hookBytes = Utils.toByteArray(hook);
+                IWorkerHook hookObject = Utils.javaDeserialize(hookBytes, 
IWorkerHook.class);
+                deserializedWorkerHooks.add(hookObject);
+            }
+        }    
     }
 
     public void refreshConnections() {
@@ -599,20 +613,16 @@ public class WorkerState {
     public void runWorkerStartHooks() {
         WorkerTopologyContext workerContext = getWorkerTopologyContext();
         if (topology.is_set_worker_hooks()) {
-            for (ByteBuffer hook : topology.get_worker_hooks()) {
-                byte[] hookBytes = Utils.toByteArray(hook);
-                BaseWorkerHook hookObject = Utils.javaDeserialize(hookBytes, 
BaseWorkerHook.class);
-                hookObject.start(topologyConf, workerContext);
+            for (IWorkerHook hook : getDeserializedWorkerHooks()) {
+                hook.start(topologyConf, workerContext);
             }
         }
     }
 
     public void runWorkerShutdownHooks() {
         if (topology.is_set_worker_hooks()) {
-            for (ByteBuffer hook : topology.get_worker_hooks()) {
-                byte[] hookBytes = Utils.toByteArray(hook);
-                BaseWorkerHook hookObject = Utils.javaDeserialize(hookBytes, 
BaseWorkerHook.class);
-                hookObject.shutdown();
+            for (IWorkerHook hook : getDeserializedWorkerHooks()) {
+                hook.shutdown();
             }
         }
     }

Reply via email to