[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-13 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r168057328
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
 ---
@@ -22,19 +22,23 @@
 import org.apache.storm.executor.TupleInfo;
 import org.apache.storm.spout.ISpout;
 import org.apache.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.MessageId;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.MutableLong;
 import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
+// Methods are not thread safe. Each thread expected to have a separate 
instance, or else synchronize externally
--- End diff --

Done.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-13 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r168046999
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
 ---
@@ -22,19 +22,23 @@
 import org.apache.storm.executor.TupleInfo;
 import org.apache.storm.spout.ISpout;
 import org.apache.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.MessageId;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.MutableLong;
 import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
+// Methods are not thread safe. Each thread expected to have a separate 
instance, or else synchronize externally
--- End diff --

yes please thats the intent.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-13 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r168044307
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
 ---
@@ -22,19 +22,23 @@
 import org.apache.storm.executor.TupleInfo;
 import org.apache.storm.spout.ISpout;
 import org.apache.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.MessageId;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.MutableLong;
 import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
+// Methods are not thread safe. Each thread expected to have a separate 
instance, or else synchronize externally
--- End diff --

OK. Please let me know if you plan to figure out in time frame of Storm 
2.0.0. I'll add it in epic of releasing Storm 2.0.0. Thanks!


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-13 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r168037754
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
 ---
@@ -22,19 +22,23 @@
 import org.apache.storm.executor.TupleInfo;
 import org.apache.storm.spout.ISpout;
 import org.apache.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.MessageId;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.MutableLong;
 import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
+// Methods are not thread safe. Each thread expected to have a separate 
instance, or else synchronize externally
--- End diff --

It is to figure out what will have for Storm 2.0... since we cannot make 
any breaking changes even if we like to thereafter until 3.0.  


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-13 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r168034925
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
 ---
@@ -22,19 +22,23 @@
 import org.apache.storm.executor.TupleInfo;
 import org.apache.storm.spout.ISpout;
 import org.apache.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.MessageId;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.MutableLong;
 import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
+// Methods are not thread safe. Each thread expected to have a separate 
instance, or else synchronize externally
--- End diff --

I thought STORM-2945 was filed to find the way to support background emit 
without external synchronization, so likely having the chance to keep 
unresolved in 2.x. If you intended to document how to enable background emit 
with current state in STORM-2945, please ignore the comment here.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-13 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r168033940
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
 ---
@@ -22,19 +22,23 @@
 import org.apache.storm.executor.TupleInfo;
 import org.apache.storm.spout.ISpout;
 import org.apache.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.MessageId;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.MutableLong;
 import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
+// Methods are not thread safe. Each thread expected to have a separate 
instance, or else synchronize externally
--- End diff --

To nail down and document the concurrent emits semantics I had opened 
[STORM-2945](https://issues.apache.org/jira/browse/STORM-2945)


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-13 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r168033466
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java 
---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Instances of this type are sent from NettyWorker to upstream 
WorkerTransfer to indicate BackPressure situation
+public class BackPressureStatus {
+public static final short IDENTIFIER = (short)-600;
+private static final int SIZE_OF_ID = 2; // size if IDENTIFIER
+private static final int SIZE_OF_INT = 4;
+
+private static AtomicLong bpCount = new AtomicLong(0);
+
+public String workerId;
+public final long id;   // monotonically 
increasing id
--- End diff --

OK thanks for clarification. I thought it as some kinds of guarantee we 
should ensure. Maybe better to clear out that that's not a requirement and only 
for debugging purpose.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-13 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r168032846
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
 ---
@@ -22,19 +22,23 @@
 import org.apache.storm.executor.TupleInfo;
 import org.apache.storm.spout.ISpout;
 import org.apache.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.MessageId;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.MutableLong;
 import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
+// Methods are not thread safe. Each thread expected to have a separate 
instance, or else synchronize externally
--- End diff --

nit: better to make it as javadoc so that it can be exposed to more ways.

As @revans2 stated, I also think this is a good assumption for a spout, but 
even better to update the restriction if we have documented any. That's just a 
2 cents, not blocker.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-13 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r168026607
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java 
---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Instances of this type are sent from NettyWorker to upstream 
WorkerTransfer to indicate BackPressure situation
+public class BackPressureStatus {
+public static final short IDENTIFIER = (short)-600;
+private static final int SIZE_OF_ID = 2; // size if IDENTIFIER
+private static final int SIZE_OF_INT = 4;
+
+private static AtomicLong bpCount = new AtomicLong(0);
+
+public String workerId;
+public final long id;   // monotonically 
increasing id
--- End diff --

Its only for debugging purposes.. so that we can co-relate sent & recvd 
msgs. I have used it to measure latency involved in transmission of 
BackPressureStatus msgs. 


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-13 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167814459
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java 
---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Instances of this type are sent from NettyWorker to upstream 
WorkerTransfer to indicate BackPressure situation
+public class BackPressureStatus implements java.io.Serializable {
--- End diff --

Thanks for the detailed comment. Made the changes and tested them as well.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-13 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167807269
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
 ---
@@ -44,14 +48,15 @@
 private final Boolean isEventLoggers;
 private final Boolean isDebug;
 private final RotatingMap pending;
+private TupleInfo globalTupleInfo = new TupleInfo();  // thread 
safety: assumes Collector.emit*() calls are externally synchronized (if needed).
--- End diff --

To get the id of the current thread  involves a call to 
Thread.currentThread() which is quite 
[expensive](!http://www.jutils.com/checks/performance.html)... so not good to 
use to determine whether or not to use fast path.

I am introducing that check if topology.debug is enabled as a compromise. 
This mode could 
 be used mode to do any checks in dev mode that are unnecessary or 
expensive to do repeatedly in production.

I have opened: 
[STORM-2945](!https://issues.apache.org/jira/browse/STORM-2945) to nail down 
and document background emits support.. we can document both spout & bolt 
support semantics together in the same document.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-13 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167800249
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchExecutor.java 
---
@@ -194,7 +194,12 @@ public void reportError(Throwable t) {
 public void emitDirect(int task, String stream, List 
values, Object id) {
 throw new UnsupportedOperationException("Trident does not 
support direct streams");
 }
-
+
+@Override
+public void flush() {
+//NOOP   //TODO: Roshan: validate if this is OK
--- End diff --

needs to flush.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-12 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167625247
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java ---
@@ -40,109 +40,117 @@
 import org.apache.storm.nimbus.NimbusInfo;
 
 public interface IStormClusterState {
-public List assignments(Runnable callback);
+List assignments(Runnable callback);
 
-public Assignment assignmentInfo(String stormId, Runnable callback);
+Assignment assignmentInfo(String stormId, Runnable callback);
 
-public VersionedData assignmentInfoWithVersion(String 
stormId, Runnable callback);
+VersionedData assignmentInfoWithVersion(String stormId, 
Runnable callback);
 
-public Integer assignmentVersion(String stormId, Runnable callback) 
throws Exception;
+Integer assignmentVersion(String stormId, Runnable callback) throws 
Exception;
 
-public List blobstoreInfo(String blobKey);
+List blobstoreInfo(String blobKey);
 
-public List nimbuses();
+List nimbuses();
 
-public void addNimbusHost(String nimbusId, NimbusSummary 
nimbusSummary);
+void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
 
-public List activeStorms();
+List activeStorms();
 
 /**
  * Get a storm base for a topology
  * @param stormId the id of the topology
  * @param callback something to call if the data changes (best effort)
  * @return the StormBase or null if it is not alive.
  */
-public StormBase stormBase(String stormId, Runnable callback);
+StormBase stormBase(String stormId, Runnable callback);
 
-public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, 
String node, Long port);
+ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, 
Long port);
 
-public List getWorkerProfileRequests(String stormId, 
NodeInfo nodeInfo);
+List getWorkerProfileRequests(String stormId, NodeInfo 
nodeInfo);
 
-public List getTopologyProfileRequests(String stormId);
+List getTopologyProfileRequests(String stormId);
 
-public void setWorkerProfileRequest(String stormId, ProfileRequest 
profileRequest);
+void setWorkerProfileRequest(String stormId, ProfileRequest 
profileRequest);
 
-public void deleteTopologyProfileRequests(String stormId, 
ProfileRequest profileRequest);
+void deleteTopologyProfileRequests(String stormId, ProfileRequest 
profileRequest);
 
-public Map executorBeats(String stormId, 
Map executorNodePort);
+Map executorBeats(String stormId, 
Map executorNodePort);
 
-public List supervisors(Runnable callback);
+List supervisors(Runnable callback);
 
-public SupervisorInfo supervisorInfo(String supervisorId); // returns 
nil if doesn't exist
+SupervisorInfo supervisorInfo(String supervisorId); // returns nil if 
doesn't exist
 
-public void setupHeatbeats(String stormId);
+void setupHeatbeats(String stormId);
 
-public void teardownHeartbeats(String stormId);
+void teardownHeartbeats(String stormId);
 
-public void teardownTopologyErrors(String stormId);
+void teardownTopologyErrors(String stormId);
 
-public List heartbeatStorms();
+List heartbeatStorms();
 
-public List errorTopologies();
+List errorTopologies();
 
-public List backpressureTopologies();
+/** @deprecated: In Storm 2.0. Retained for enabling transition from 
1.x. Will be removed soon. */
--- End diff --

done. 3.0.0


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-10 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167402753
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java 
---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Instances of this type are sent from NettyWorker to upstream 
WorkerTransfer to indicate BackPressure situation
+public class BackPressureStatus implements java.io.Serializable {
--- End diff --

ControlMessage, MessageBatch and SaslMessageToken are handled explicitly by 
MessageDecoder and MessageEncoder using the buffer and read methods.
When ControlMessage, SaslMessageToken, or Message Batch writes themselves 
out to a buffer they do not use kryo to do it.


https://github.com/apache/storm/blob/aaebc3b237916340156ac3b8dc956d6c62c34983/storm-client/src/jvm/org/apache/storm/messaging/netty/ControlMessage.java#L60-L64


https://github.com/apache/storm/blob/aaebc3b237916340156ac3b8dc956d6c62c34983/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslMessageToken.java#L86-L101


https://github.com/apache/storm/blob/aaebc3b237916340156ac3b8dc956d6c62c34983/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBatch.java#L80-L93

It is a hand coded protocol (for good or bad).

The messages inside MessageBatch have already been serialized using kryo 
elsewhere in the pipeline.  

For the load messages we hid them as a tuple inside a MessageBatch.  We did 
this so we could do a rolling upgrade with it, but it is an ugly hack.

BackPressureStatus is serialized/deserialized using 
MessageEncoder/MessageDecoder, but it uses kryo internally to do it.

So please remove Serializable from BackPressureStatus.  Register it with 
kryo.  Do not remove `BackPressureStatus.buffer()` nor 
`BackPressureStatus.read()`.  I think the changes to KryoTupleSerializer to 
send a BackPressureState can be removed assuming no one is calling them 
directly.

The simplest way to test this is to run a topology with multiple workers 
and `topology.fall.back.on.java.serialization=false`.  This is a config that 
makes it so kryo does not fall back to java serialization when trying to use 
kryo. 

For normal tuples/end users you can set 
`topology.testing.always.try.serialize=true` and every tuple emitted will be 
serialized, even in local mode.  This is a way to unit test that you have setup 
kryo appropriately, but this BackPressureStatus is a special message so we need 
to do it differently.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167388018
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java 
---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Instances of this type are sent from NettyWorker to upstream 
WorkerTransfer to indicate BackPressure situation
+public class BackPressureStatus implements java.io.Serializable {
--- End diff --

Hmm interesting. 
Wondering why ControlMessage, MessageBatch & SaslMessageToken dont follow 
the same pattern of registering ?

Can you confirm that you are suggesting the following steps ?
- Remove the inheritance from Serializable interface
- Register BackPressureStatus.class with kryo
- Remove the BackPressureStatus.buffer() and BackPressureStatus.read() 
methods ?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167387844
  
--- Diff: pom.xml ---
@@ -259,6 +259,7 @@
 1.11
 4.3.3
 0.2.4
+2.0.1
--- End diff --

I guess I misread things, please ignore this comment.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167387808
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
 ---
@@ -44,14 +48,15 @@
 private final Boolean isEventLoggers;
 private final Boolean isDebug;
 private final RotatingMap pending;
+private TupleInfo globalTupleInfo = new TupleInfo();  // thread 
safety: assumes Collector.emit*() calls are externally synchronized (if needed).
--- End diff --

Can we do a sanity check for the fast path + documentation?  Check if the 
thread id is the same as the id of the main thread we expect emits to come 
from.  If so we go with the fast path, if not we have a thread local or do some 
kind of locking + documentation about why you never want the spout to emit from 
a background thread.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167387609
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java 
---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Instances of this type are sent from NettyWorker to upstream 
WorkerTransfer to indicate BackPressure situation
+public class BackPressureStatus implements java.io.Serializable {
--- End diff --


https://github.com/apache/storm/blob/aaebc3b237916340156ac3b8dc956d6c62c34983/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java#L66-L74


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167387629
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java 
---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Instances of this type are sent from NettyWorker to upstream 
WorkerTransfer to indicate BackPressure situation
+public class BackPressureStatus implements java.io.Serializable {
--- End diff --

You should be able to get away with just registering the class and not 
providing a serializer.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167385890
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java 
---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Instances of this type are sent from NettyWorker to upstream 
WorkerTransfer to indicate BackPressure situation
+public class BackPressureStatus implements java.io.Serializable {
--- End diff --

I was not aware of that. Do we have an example of what i should do instead ?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167385590
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
 ---
@@ -44,14 +48,15 @@
 private final Boolean isEventLoggers;
 private final Boolean isDebug;
 private final RotatingMap pending;
+private TupleInfo globalTupleInfo = new TupleInfo();  // thread 
safety: assumes Collector.emit*() calls are externally synchronized (if needed).
--- End diff --

Accessing the Thread Local (TL) instance via ThreadLocal.get() typically 
involves a map lookup behind the scenes.

**Related Note:** I reluctantly used TL for latching on to 
JCQueue.BatchInserter instance for the producers to JCQueue. Reluctant since I 
noticed perf hit when doing some targeted microbenchmarking. I used it anyway 
because it was a perf improvement over the ConcurrentHashMap employed in 
Disruptor, and eliminating TL needed a bigger change to the interface and 
producers. I think it is possible to achieve TL free JCQueue and gain some 
perf.. perhaps in a follow up jira.

Although many decisions were measured, due to scope it was not feasible to 
measure each one. So, in the critical path, I have taken this general approach 
of :
- avoid locks & synchronization ... and try using lock/wait-free approaches 
where synchronization is unavoidable.
- avoid map lookups and object creation

This was a case of avoiding synchronization, (TL) map lookups & object 
allocation.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167383283
  
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/Acker.java ---
@@ -99,17 +100,19 @@ public void execute(Tuple input) {
 pending.put(id, curr);
 } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
 resetTimeout = true;
-if (curr != null) {
+if (curr == null) {
--- End diff --

Unintentional. This appears to be due a mistake most likely when resolving 
merge conflicts when I rebased my code to master sometime in Dec 2017. Will 
revert. Thanks for catching.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167381302
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java ---
@@ -40,109 +40,117 @@
 import org.apache.storm.nimbus.NimbusInfo;
 
 public interface IStormClusterState {
-public List assignments(Runnable callback);
+List assignments(Runnable callback);
 
-public Assignment assignmentInfo(String stormId, Runnable callback);
+Assignment assignmentInfo(String stormId, Runnable callback);
 
-public VersionedData assignmentInfoWithVersion(String 
stormId, Runnable callback);
+VersionedData assignmentInfoWithVersion(String stormId, 
Runnable callback);
 
-public Integer assignmentVersion(String stormId, Runnable callback) 
throws Exception;
+Integer assignmentVersion(String stormId, Runnable callback) throws 
Exception;
 
-public List blobstoreInfo(String blobKey);
+List blobstoreInfo(String blobKey);
 
-public List nimbuses();
+List nimbuses();
 
-public void addNimbusHost(String nimbusId, NimbusSummary 
nimbusSummary);
+void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
 
-public List activeStorms();
+List activeStorms();
 
 /**
  * Get a storm base for a topology
  * @param stormId the id of the topology
  * @param callback something to call if the data changes (best effort)
  * @return the StormBase or null if it is not alive.
  */
-public StormBase stormBase(String stormId, Runnable callback);
+StormBase stormBase(String stormId, Runnable callback);
 
-public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, 
String node, Long port);
+ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, 
Long port);
 
-public List getWorkerProfileRequests(String stormId, 
NodeInfo nodeInfo);
+List getWorkerProfileRequests(String stormId, NodeInfo 
nodeInfo);
 
-public List getTopologyProfileRequests(String stormId);
+List getTopologyProfileRequests(String stormId);
 
-public void setWorkerProfileRequest(String stormId, ProfileRequest 
profileRequest);
+void setWorkerProfileRequest(String stormId, ProfileRequest 
profileRequest);
 
-public void deleteTopologyProfileRequests(String stormId, 
ProfileRequest profileRequest);
+void deleteTopologyProfileRequests(String stormId, ProfileRequest 
profileRequest);
 
-public Map executorBeats(String stormId, 
Map executorNodePort);
+Map executorBeats(String stormId, 
Map executorNodePort);
 
-public List supervisors(Runnable callback);
+List supervisors(Runnable callback);
 
-public SupervisorInfo supervisorInfo(String supervisorId); // returns 
nil if doesn't exist
+SupervisorInfo supervisorInfo(String supervisorId); // returns nil if 
doesn't exist
 
-public void setupHeatbeats(String stormId);
+void setupHeatbeats(String stormId);
 
-public void teardownHeartbeats(String stormId);
+void teardownHeartbeats(String stormId);
 
-public void teardownTopologyErrors(String stormId);
+void teardownTopologyErrors(String stormId);
 
-public List heartbeatStorms();
+List heartbeatStorms();
 
-public List errorTopologies();
+List errorTopologies();
 
-public List backpressureTopologies();
+/** @deprecated: In Storm 2.0. Retained for enabling transition from 
1.x. Will be removed soon. */
--- End diff --

Filed STORM-2944. Can you please add a 3.0 as a version to jira so that 
this jira can be associated with it ?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167379385
  
--- Diff: pom.xml ---
@@ -259,6 +259,7 @@
 1.11
 4.3.3
 0.2.4
+2.0.1
--- End diff --

I dont see any references to jctools in any flux related pom.  Can you 
point to the specific offending location ?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167286165
  
--- Diff: pom.xml ---
@@ -259,6 +259,7 @@
 1.11
 4.3.3
 0.2.4
+2.0.1
--- End diff --

Why does flux need jctools?  Shouldn't it come with storm-client?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167328981
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
 ---
@@ -44,14 +48,15 @@
 private final Boolean isEventLoggers;
 private final Boolean isDebug;
 private final RotatingMap pending;
+private TupleInfo globalTupleInfo = new TupleInfo();  // thread 
safety: assumes Collector.emit*() calls are externally synchronized (if needed).
--- End diff --

This feels like a good assumption for a spout, but I would like to 
understand the cost of making this thread safe (thread local instance etc), and 
at least document it if that cost is high, or preferably find a low cost 
solution to throw an exception if it does happen.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167286543
  
--- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
@@ -890,30 +871,91 @@
 public static final String TOPOLOGY_ISOLATED_MACHINES = 
"topology.isolate.machines";
 
 /**
- * Configure timeout milliseconds used for disruptor queue wait 
strategy. Can be used to tradeoff latency
- * vs. CPU usage
+ * Selects the Bolt's Wait Strategy to use when there are no incoming 
msgs. Used to trade off latency vs CPU usage.
+ */
+@isString
--- End diff --

Can we check if this is an instance of the proper parent interface?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167325679
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.daemon.worker;
+
+import org.apache.storm.Config;
+import org.apache.storm.messaging.TaskMessage;
+import org.apache.storm.policy.IWaitStrategy;
+import org.apache.storm.serialization.ITupleSerializer;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.utils.JCQueue;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.TransferDrainer;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.Utils.SmartThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+// Transfers messages destined to other workers
+class WorkerTransfer implements JCQueue.Consumer {
+static final Logger LOG = 
LoggerFactory.getLogger(WorkerTransfer.class);
+
+private final TransferDrainer drainer;
+private WorkerState workerState;
+private IWaitStrategy backPressureWaitStrategy;
+
+JCQueue transferQueue; // [remoteTaskId] -> JCQueue. Some entries 
maybe null (if no emits to those tasksIds from this worker)
+AtomicBoolean[] remoteBackPressureStatus; // [[remoteTaskId] -> 
true/false : indicates if remote task is under BP.
--- End diff --

Same here for package private.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167288058
  
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/Acker.java ---
@@ -99,17 +100,19 @@ public void execute(Tuple input) {
 pending.put(id, curr);
 } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
 resetTimeout = true;
-if (curr != null) {
+if (curr == null) {
--- End diff --

Why is this change being made?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167286681
  
--- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
@@ -890,30 +871,91 @@
 public static final String TOPOLOGY_ISOLATED_MACHINES = 
"topology.isolate.machines";
 
 /**
- * Configure timeout milliseconds used for disruptor queue wait 
strategy. Can be used to tradeoff latency
- * vs. CPU usage
+ * Selects the Bolt's Wait Strategy to use when there are no incoming 
msgs. Used to trade off latency vs CPU usage.
+ */
+@isString
+public static final String TOPOLOGY_BOLT_WAIT_STRATEGY = 
"topology.bolt.wait.strategy";
+
+/**
+ * Configures park time for WaitStrategyPark.  If set to 0, returns 
immediately (i.e busy wait).
  */
-@isInteger
 @NotNull
-public static final String 
TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS="topology.disruptor.wait.timeout.millis";
+@isPositiveNumber(includeZero = true)
+public static final String TOPOLOGY_BOLT_WAIT_PARK_MICROSEC = 
"topology.bolt.wait.park.microsec";
 
 /**
- * The number of tuples to batch before sending to the next thread.  
This number is just an initial suggestion and
- * the code may adjust it as your topology runs.
+ * Configures number of iterations to spend in level 1 of 
WaitStrategyProgressive, before progressing to level 2
  */
+@NotNull
 @isInteger
 @isPositiveNumber
+public static final String TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL1_COUNT 
=  "topology.bolt.wait.progressive.level1.count";
+
+/**
+ * Configures number of iterations to spend in level 2 of 
WaitStrategyProgressive, before progressing to level 3
+ */
 @NotNull
-public static final String 
TOPOLOGY_DISRUPTOR_BATCH_SIZE="topology.disruptor.batch.size";
+@isInteger
+@isPositiveNumber
+public static final String TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL2_COUNT 
=  "topology.bolt.wait.progressive.level2.count";
 
 /**
- * The maximum age in milliseconds a batch can be before being sent to 
the next thread.  This number is just an
- * initial suggestion and the code may adjust it as your topology runs.
+ * Configures sleep time for WaitStrategyProgressive.
  */
+@NotNull
+@isPositiveNumber(includeZero = true)
+public static final String 
TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS = 
"topology.bolt.wait.progressive.level3.sleep.millis";
+
+
+/**
+ * A class that implements a wait strategy for an upstream component 
(spout/bolt) trying to write to a downstream component
+ * whose recv queue is full
+ *
+ * 1. nextTuple emits no tuples
+ * 2. The spout has hit maxSpoutPending and can't emit any more tuples
+ */
+@isString
+public static final String 
TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY="topology.backpressure.wait.strategy";
--- End diff --

Here too if this is a class name it would be good to verify that it is an 
instance of a given class early on.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167325584
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.daemon.worker;
+
+import org.apache.storm.Config;
+import org.apache.storm.messaging.TaskMessage;
+import org.apache.storm.policy.IWaitStrategy;
+import org.apache.storm.serialization.ITupleSerializer;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.utils.JCQueue;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.TransferDrainer;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.Utils.SmartThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+// Transfers messages destined to other workers
+class WorkerTransfer implements JCQueue.Consumer {
+static final Logger LOG = 
LoggerFactory.getLogger(WorkerTransfer.class);
+
+private final TransferDrainer drainer;
+private WorkerState workerState;
+private IWaitStrategy backPressureWaitStrategy;
+
+JCQueue transferQueue; // [remoteTaskId] -> JCQueue. Some entries 
maybe null (if no emits to those tasksIds from this worker)
--- End diff --

nit why does this need to be package private?  Can we lock it down more?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167282875
  
--- Diff: docs/Concepts.md ---
@@ -113,3 +113,8 @@ Topologies execute across one or more worker processes. 
Each worker process is a
 **Resources:**
 
 * 
[Config.TOPOLOGY_WORKERS](javadocs/org/apache/storm/Config.html#TOPOLOGY_WORKERS):
 this config sets the number of workers to allocate for executing the topology
+
+### Performance Tuning
+
+Refer to [performance tuning guide](docs/CONTRIBUTING.md)
--- End diff --

I don't think `CONTRIBUTING.md` is the performance tuning guide, also the 
path is relative, and since `Performance.md` is in the same directory as this 
we don't need the `docs/` in the link


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167287026
  
--- Diff: storm-client/src/jvm/org/apache/storm/StormTimer.java ---
@@ -97,6 +97,8 @@ public void run() {
 // events.
 Time.sleep(1000);
 }
+if(Thread.interrupted())
+this.active.set(false);
--- End diff --

Nit can we wrap this in `'{'` and `'}'` and have a space after the `if` to 
match the style guide.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167330523
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java 
---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Instances of this type are sent from NettyWorker to upstream 
WorkerTransfer to indicate BackPressure situation
+public class BackPressureStatus implements java.io.Serializable {
--- End diff --

Why does this need to be serializable if we are explicitly using kryo for 
all of the serialization?  Can we just register the class with kryo instead so 
this will also work when java serialization is disabled?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167331543
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchExecutor.java 
---
@@ -194,7 +194,12 @@ public void reportError(Throwable t) {
 public void emitDirect(int task, String stream, List 
values, Object id) {
 throw new UnsupportedOperationException("Trident does not 
support direct streams");
 }
-
+
+@Override
+public void flush() {
+//NOOP   //TODO: Roshan: validate if this is OK
--- End diff --

TODO needs to go away.  Is this OK to not have a flush?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167326878
  
--- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -313,107 +330,74 @@ public void metricsTick(Task taskData, TupleImpl 
tuple) {
 protected void setupMetrics() {
 for (final Integer interval : 
intervalToTaskToMetricToRegistry.keySet()) {
 StormTimer timerTask = workerData.getUserTimer();
-timerTask.scheduleRecurring(interval, interval, new Runnable() 
{
-@Override
-public void run() {
-TupleImpl tuple = new TupleImpl(workerTopologyContext, 
new Values(interval),
-(int) Constants.SYSTEM_TASK_ID, 
Constants.METRICS_TICK_STREAM_ID);
-List metricsTickTuple =
-Lists.newArrayList(new 
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
-receiveQueue.publish(metricsTickTuple);
+timerTask.scheduleRecurring(interval, interval,
+() -> {
+TupleImpl tuple = new TupleImpl(workerTopologyContext, 
new Values(interval), Constants.SYSTEM_COMPONENT_ID,
+(int) Constants.SYSTEM_TASK_ID, 
Constants.METRICS_TICK_STREAM_ID);
+AddressedTuple metricsTickTuple = new 
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
+try {
+receiveQueue.publish(metricsTickTuple);
+receiveQueue.flush();  // avoid buffering
+} catch (InterruptedException e) {
+LOG.warn("Thread interrupted when publishing 
metrics. Setting interrupt flag.");
+Thread.currentThread().interrupt();
+return;
+}
 }
-});
-}
-}
-
-public void sendUnanchored(Task task, String stream, List 
values, ExecutorTransfer transfer) {
-Tuple tuple = task.getTuple(stream, values);
-List tasks = task.getOutgoingTasks(stream, values);
-for (Integer t : tasks) {
-transfer.transfer(t, tuple);
-}
-}
-
-/**
- * Send sampled data to the eventlogger if the global or component 
level debug flag is set (via nimbus api).
- */
-public void sendToEventLogger(Executor executor, Task taskData, List 
values,
-  String componentId, Object messageId, 
Random random) {
-Map componentDebug = 
executor.getStormComponentDebug().get();
-DebugOptions debugOptions = componentDebug.get(componentId);
-if (debugOptions == null) {
-debugOptions = componentDebug.get(executor.getStormId());
-}
-double spct = ((debugOptions != null) && 
(debugOptions.is_enable())) ? debugOptions.get_samplingpct() : 0;
-if (spct > 0 && (random.nextDouble() * 100) < spct) {
-sendUnanchored(taskData, StormCommon.EVENTLOGGER_STREAM_ID,
-new Values(componentId, messageId, 
System.currentTimeMillis(), values),
-executor.getExecutorTransfer());
+);
 }
 }
 
-public void reflectNewLoadMapping(LoadMapping loadMapping) {
-for (LoadAwareCustomStreamGrouping g : groupers) {
-g.refreshLoad(loadMapping);
-}
-}
-
-private void registerBackpressure() {
-receiveQueue.registerBackpressureCallback(new 
DisruptorBackpressureCallback() {
-@Override
-public void highWaterMark() throws Exception {
-LOG.debug("executor " + executorId + " is congested, set 
backpressure flag true");
-
WorkerBackpressureThread.notifyBackpressureChecker(workerData.getBackpressureTrigger());
-}
-
-@Override
-public void lowWaterMark() throws Exception {
-LOG.debug("executor " + executorId + " is not-congested, 
set backpressure flag false");
-
WorkerBackpressureThread.notifyBackpressureChecker(workerData.getBackpressureTrigger());
-}
-});
-
receiveQueue.setHighWaterMark(ObjectReader.getDouble(topoConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
-
receiveQueue.setLowWaterMark(ObjectReader.getDouble(topoConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
-
receiveQueue.setEnableBackpressure(ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE),
 false));
-}
-
 protected void setupTicks(boolean 

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167293987
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java ---
@@ -456,137 +450,135 @@ public void refreshStormActive(Runnable callback) {
 }
 }
 
-public void refreshThrottle() {
-boolean backpressure = 
stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, 
this::refreshThrottle);
-this.throttleOn.set(backpressure);
-}
-
-private static double getQueueLoad(DisruptorQueue q) {
-DisruptorQueue.QueueMetrics qMetrics = q.getMetrics();
+private static double getQueueLoad(JCQueue q) {
+JCQueue.QueueMetrics qMetrics = q.getMetrics();
 return ((double) qMetrics.population()) / qMetrics.capacity();
 }
 
 public void refreshLoad(List execs) {
-Set remoteTasks = Sets.difference(new 
HashSet<>(outboundTasks), new HashSet<>(taskIds));
+Set remoteTasks = Sets.difference(new 
HashSet<>(outboundTasks), new HashSet<>(localTaskIds));
 Long now = System.currentTimeMillis();
 Map localLoad = new HashMap<>();
-for (IRunningExecutor exec: execs) {
+for (IRunningExecutor exec : execs) {
 double receiveLoad = getQueueLoad(exec.getReceiveQueue());
-double sendLoad = getQueueLoad(exec.getSendQueue());
-localLoad.put(exec.getExecutorId().get(0).intValue(), 
Math.max(receiveLoad, sendLoad));
+localLoad.put(exec.getExecutorId().get(0).intValue(), 
receiveLoad);
 }
 
 Map remoteLoad = new HashMap<>();
 cachedNodeToPortSocket.get().values().stream().forEach(conn -> 
remoteLoad.putAll(conn.getLoad(remoteTasks)));
 loadMapping.setLocal(localLoad);
 loadMapping.setRemote(remoteLoad);
 
-if (now > nextUpdate.get()) {
+if (now > nextLoadUpdate.get()) {
 receiver.sendLoadMetrics(localLoad);
-nextUpdate.set(now + LOAD_REFRESH_INTERVAL_MS);
+nextLoadUpdate.set(now + LOAD_REFRESH_INTERVAL_MS);
 }
 }
 
+// checks if the tasks which had back pressure are now free again. if 
so, sends an update to other workers
+public void refreshBackPressureStatus() {
+LOG.debug("Checking for change in Backpressure status on worker's 
tasks");
+boolean bpSituationChanged = bpTracker.refreshBpTaskList();
+if (bpSituationChanged) {
+BackPressureStatus bpStatus = bpTracker.getCurrStatus();
+receiver.sendBackPressureStatus(bpStatus);
+}
+}
+
+
 /**
  * we will wait all connections to be ready and then activate the 
spout/bolt
  * when the worker bootup.
  */
 public void activateWorkerWhenAllConnectionsReady() {
 int delaySecs = 0;
 int recurSecs = 1;
-refreshActiveTimer.schedule(delaySecs, new Runnable() {
-@Override public void run() {
+refreshActiveTimer.schedule(delaySecs,
+() -> {
 if (areAllConnectionsReady()) {
 LOG.info("All connections are ready for worker {}:{} 
with id {}", assignmentId, port, workerId);
 isWorkerActive.set(Boolean.TRUE);
 } else {
 refreshActiveTimer.schedule(recurSecs, () -> 
activateWorkerWhenAllConnectionsReady(), false, 0);
 }
 }
-});
+);
 }
 
 public void registerCallbacks() {
 LOG.info("Registering IConnectionCallbacks for {}:{}", 
assignmentId, port);
 receiver.registerRecv(new 
DeserializingConnectionCallback(topologyConf,
 getWorkerTopologyContext(),
-this::transferLocal));
+this::transferLocalBatch));
+// Send curr BackPressure status to new clients
+receiver.registerNewConnectionResponse(
+() -> {
+BackPressureStatus bpStatus = bpTracker.getCurrStatus();
+LOG.info("Sending BackPressure status to new client. 
BPStatus: {}", bpStatus);
+return bpStatus;
+}
+);
 }
 
-public void transferLocal(List tupleBatch) {
-Map grouped = new HashMap<>();
-for (AddressedTuple tuple : tupleBatch) {
-Integer executor = taskToShortExecutor.get(tuple.dest);
-if (null == executor) {
-LOG.warn("Received invalid messages for unknown tasks. 
Dropping... ");
-

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167287393
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java ---
@@ -40,109 +40,117 @@
 import org.apache.storm.nimbus.NimbusInfo;
 
 public interface IStormClusterState {
-public List assignments(Runnable callback);
+List assignments(Runnable callback);
 
-public Assignment assignmentInfo(String stormId, Runnable callback);
+Assignment assignmentInfo(String stormId, Runnable callback);
 
-public VersionedData assignmentInfoWithVersion(String 
stormId, Runnable callback);
+VersionedData assignmentInfoWithVersion(String stormId, 
Runnable callback);
 
-public Integer assignmentVersion(String stormId, Runnable callback) 
throws Exception;
+Integer assignmentVersion(String stormId, Runnable callback) throws 
Exception;
 
-public List blobstoreInfo(String blobKey);
+List blobstoreInfo(String blobKey);
 
-public List nimbuses();
+List nimbuses();
 
-public void addNimbusHost(String nimbusId, NimbusSummary 
nimbusSummary);
+void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
 
-public List activeStorms();
+List activeStorms();
 
 /**
  * Get a storm base for a topology
  * @param stormId the id of the topology
  * @param callback something to call if the data changes (best effort)
  * @return the StormBase or null if it is not alive.
  */
-public StormBase stormBase(String stormId, Runnable callback);
+StormBase stormBase(String stormId, Runnable callback);
 
-public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, 
String node, Long port);
+ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, 
Long port);
 
-public List getWorkerProfileRequests(String stormId, 
NodeInfo nodeInfo);
+List getWorkerProfileRequests(String stormId, NodeInfo 
nodeInfo);
 
-public List getTopologyProfileRequests(String stormId);
+List getTopologyProfileRequests(String stormId);
 
-public void setWorkerProfileRequest(String stormId, ProfileRequest 
profileRequest);
+void setWorkerProfileRequest(String stormId, ProfileRequest 
profileRequest);
 
-public void deleteTopologyProfileRequests(String stormId, 
ProfileRequest profileRequest);
+void deleteTopologyProfileRequests(String stormId, ProfileRequest 
profileRequest);
 
-public Map executorBeats(String stormId, 
Map executorNodePort);
+Map executorBeats(String stormId, 
Map executorNodePort);
 
-public List supervisors(Runnable callback);
+List supervisors(Runnable callback);
 
-public SupervisorInfo supervisorInfo(String supervisorId); // returns 
nil if doesn't exist
+SupervisorInfo supervisorInfo(String supervisorId); // returns nil if 
doesn't exist
 
-public void setupHeatbeats(String stormId);
+void setupHeatbeats(String stormId);
 
-public void teardownHeartbeats(String stormId);
+void teardownHeartbeats(String stormId);
 
-public void teardownTopologyErrors(String stormId);
+void teardownTopologyErrors(String stormId);
 
-public List heartbeatStorms();
+List heartbeatStorms();
 
-public List errorTopologies();
+List errorTopologies();
 
-public List backpressureTopologies();
+/** @deprecated: In Storm 2.0. Retained for enabling transition from 
1.x. Will be removed soon. */
--- End diff --

Could you file a JIRA for us to remove it in 3.x?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167288382
  
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java ---
@@ -362,6 +362,8 @@ public static void addSystemStreams(StormTopology 
topology) {
 public static void addEventLogger(Map conf, 
StormTopology topology) {
 Integer numExecutors = 
ObjectReader.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS),
 ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
+if(numExecutors==null || numExecutors==0)
--- End diff --

nit can we fix the style here with a space after the if and '{' '}'


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167285848
  
--- Diff: 
flux/flux-core/src/test/java/org/apache/storm/flux/multilang/MultilangEnvironmentTest.java
 ---
@@ -18,6 +18,7 @@
 package org.apache.storm.flux.multilang;
 
 
+import org.junit.Ignore;
--- End diff --

nit: I don't think this is used in here, so can we remove it?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-08 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167097905
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java ---
@@ -17,72 +17,124 @@
  */
 package org.apache.storm.executor;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.lmax.disruptor.EventHandler;
 import org.apache.storm.Config;
 import org.apache.storm.daemon.worker.WorkerState;
 import org.apache.storm.serialization.KryoTupleSerializer;
 import org.apache.storm.tuple.AddressedTuple;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.DisruptorQueue;
-import org.apache.storm.utils.MutableObject;
+import org.apache.storm.utils.JCQueue;
 import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.Queue;
 
-public class ExecutorTransfer implements EventHandler, Callable {
+// Every executor has an instance of this class
+public class ExecutorTransfer  {
 private static final Logger LOG = 
LoggerFactory.getLogger(ExecutorTransfer.class);
 
 private final WorkerState workerData;
-private final DisruptorQueue batchTransferQueue;
-private final Map topoConf;
 private final KryoTupleSerializer serializer;
-private final MutableObject cachedEmit;
 private final boolean isDebug;
+private final int producerBatchSz;
+private int remotesBatchSz = 0;
+private int indexingBase = 0;
+private ArrayList localReceiveQueues; // 
[taskId-indexingBase] => queue : List of all recvQs local to this worker
+private ArrayList queuesToFlush; // [taskId-indexingBase] => 
queue, some entries can be null. : outbound Qs for this executor instance
 
-public ExecutorTransfer(WorkerState workerData, DisruptorQueue 
batchTransferQueue, Map topoConf) {
+
+public ExecutorTransfer(WorkerState workerData, Map 
topoConf) {
 this.workerData = workerData;
-this.batchTransferQueue = batchTransferQueue;
-this.topoConf = topoConf;
 this.serializer = new KryoTupleSerializer(topoConf, 
workerData.getWorkerTopologyContext());
-this.cachedEmit = new MutableObject(new ArrayList<>());
 this.isDebug = 
ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_DEBUG), false);
+this.producerBatchSz = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
+}
+
+// to be called after all Executor objects in the worker are created 
and before this object is used
+public void initLocalRecvQueues() {
+Integer minTaskId = 
workerData.getLocalReceiveQueues().keySet().stream().min(Integer::compareTo).get();
+this.localReceiveQueues = Utils.convertToArray( 
workerData.getLocalReceiveQueues(), minTaskId);
+this.indexingBase = minTaskId;
+this.queuesToFlush = new 
ArrayList(Collections.nCopies(localReceiveQueues.size(), null) );
 }
 
-public void transfer(int task, Tuple tuple) {
-AddressedTuple val = new AddressedTuple(task, tuple);
+// adds addressedTuple to destination Q if it is not full. else adds 
to pendingEmits (if its not null)
+public boolean tryTransfer(AddressedTuple addressedTuple, 
Queue pendingEmits) {
 if (isDebug) {
-LOG.info("TRANSFERRING tuple {}", val);
+LOG.info("TRANSFERRING tuple {}", addressedTuple);
+}
+
+JCQueue localQueue = getLocalQueue(addressedTuple);
+if (localQueue!=null) {
+return tryTransferLocal(addressedTuple, localQueue, 
pendingEmits);
+}  else  {
+if (remotesBatchSz >= producerBatchSz) {
+if ( !workerData.tryFlushRemotes() ) {
+if (pendingEmits != null) {
+pendingEmits.add(addressedTuple);
+}
+return false;
+}
+remotesBatchSz = 0;
--- End diff --

@roshannaik and me briefly talked about this and agree this should be the 
thing to fix. Clear way but higher impact is guarding whole else statement with 
`synchronized` or more efficient locking if any, but @roshannaik would wanted 
to explore the way to minimize the impact of synchronization.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-30 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r164971328
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
 ---
@@ -89,63 +107,80 @@ public void emitDirect(int taskId, String streamId, 
Collection anchors, L
 }
 }
 }
+msgId = MessageId.makeId(anchorsToIds);
+} else {
+msgId = MessageId.makeUnanchored();
 }
-MessageId msgId = MessageId.makeId(anchorsToIds);
-TupleImpl tupleExt = new 
TupleImpl(executor.getWorkerTopologyContext(), values, taskId, streamId, msgId);
-executor.getExecutorTransfer().transfer(t, tupleExt);
+TupleImpl tupleExt = new 
TupleImpl(executor.getWorkerTopologyContext(), values, 
executor.getComponentId(), taskId, streamId, msgId);
+xsfer.tryTransfer(new AddressedTuple(t, tupleExt), 
executor.getPendingEmits());
 }
 if (isEventLoggers) {
-executor.sendToEventLogger(executor, taskData, values, 
executor.getComponentId(), null, random);
+task.sendToEventLogger(executor, values, 
executor.getComponentId(), null, random, executor.getPendingEmits());
 }
 return outTasks;
 }
 
 @Override
 public void ack(Tuple input) {
+if(!ackingEnabled)
+return;
 long ackValue = ((TupleImpl) input).getAckVal();
 Map anchorsToIds = 
input.getMessageId().getAnchorsToIds();
 for (Map.Entry entry : anchorsToIds.entrySet()) {
-executor.sendUnanchored(taskData, Acker.ACKER_ACK_STREAM_ID,
+task.sendUnanchored(Acker.ACKER_ACK_STREAM_ID,
 new Values(entry.getKey(), 
Utils.bitXor(entry.getValue(), ackValue)),
-executor.getExecutorTransfer());
+executor.getExecutorTransfer(), 
executor.getPendingEmits());
 }
 long delta = tupleTimeDelta((TupleImpl) input);
 if (isDebug) {
 LOG.info("BOLT ack TASK: {} TIME: {} TUPLE: {}", taskId, 
delta, input);
 }
-BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta);
-boltAckInfo.applyOn(taskData.getUserContext());
+
+if ( !task.getUserContext().getHooks().isEmpty() ) {
+BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, 
delta);
+boltAckInfo.applyOn(task.getUserContext());
+}
 if (delta >= 0) {
-((BoltExecutorStats) executor.getStats()).boltAckedTuple(
-input.getSourceComponent(), input.getSourceStreamId(), 
delta);
+executor.getStats().boltAckedTuple(input.getSourceComponent(), 
input.getSourceStreamId(), delta);
 }
 }
 
 @Override
 public void fail(Tuple input) {
+if(!ackingEnabled)
+return;
 Set roots = input.getMessageId().getAnchors();
 for (Long root : roots) {
-executor.sendUnanchored(taskData, Acker.ACKER_FAIL_STREAM_ID,
-new Values(root), executor.getExecutorTransfer());
+task.sendUnanchored(Acker.ACKER_FAIL_STREAM_ID,
+new Values(root), executor.getExecutorTransfer(), 
executor.getPendingEmits());
 }
 long delta = tupleTimeDelta((TupleImpl) input);
 if (isDebug) {
 LOG.info("BOLT fail TASK: {} TIME: {} TUPLE: {}", taskId, 
delta, input);
 }
 BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta);
-boltFailInfo.applyOn(taskData.getUserContext());
-if (delta >= 0) {
-((BoltExecutorStats) executor.getStats()).boltFailedTuple(
-input.getSourceComponent(), input.getSourceStreamId(), 
delta);
+boltFailInfo.applyOn(task.getUserContext());
+if (delta != 0) {
--- End diff --

Looks like missed spot : this should be `delta >= 0`.

https://github.com/apache/storm/pull/2241/files?diff=split#r158213916

```
 (when (<= 0 delta) 
   (stats/bolt-failed-tuple! executor-stats 
 (.getSourceComponent tuple) 
 (.getSourceStreamId tuple) 
 delta 
```


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-30 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r164956555
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java ---
@@ -17,72 +17,124 @@
  */
 package org.apache.storm.executor;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.lmax.disruptor.EventHandler;
 import org.apache.storm.Config;
 import org.apache.storm.daemon.worker.WorkerState;
 import org.apache.storm.serialization.KryoTupleSerializer;
 import org.apache.storm.tuple.AddressedTuple;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.DisruptorQueue;
-import org.apache.storm.utils.MutableObject;
+import org.apache.storm.utils.JCQueue;
 import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.Queue;
 
-public class ExecutorTransfer implements EventHandler, Callable {
+// Every executor has an instance of this class
+public class ExecutorTransfer  {
 private static final Logger LOG = 
LoggerFactory.getLogger(ExecutorTransfer.class);
 
 private final WorkerState workerData;
-private final DisruptorQueue batchTransferQueue;
-private final Map topoConf;
 private final KryoTupleSerializer serializer;
-private final MutableObject cachedEmit;
 private final boolean isDebug;
+private final int producerBatchSz;
+private int remotesBatchSz = 0;
+private int indexingBase = 0;
+private ArrayList localReceiveQueues; // 
[taskId-indexingBase] => queue : List of all recvQs local to this worker
+private ArrayList queuesToFlush; // [taskId-indexingBase] => 
queue, some entries can be null. : outbound Qs for this executor instance
 
-public ExecutorTransfer(WorkerState workerData, DisruptorQueue 
batchTransferQueue, Map topoConf) {
+
+public ExecutorTransfer(WorkerState workerData, Map 
topoConf) {
 this.workerData = workerData;
-this.batchTransferQueue = batchTransferQueue;
-this.topoConf = topoConf;
 this.serializer = new KryoTupleSerializer(topoConf, 
workerData.getWorkerTopologyContext());
-this.cachedEmit = new MutableObject(new ArrayList<>());
 this.isDebug = 
ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_DEBUG), false);
+this.producerBatchSz = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
+}
+
+// to be called after all Executor objects in the worker are created 
and before this object is used
+public void initLocalRecvQueues() {
+Integer minTaskId = 
workerData.getLocalReceiveQueues().keySet().stream().min(Integer::compareTo).get();
+this.localReceiveQueues = Utils.convertToArray( 
workerData.getLocalReceiveQueues(), minTaskId);
+this.indexingBase = minTaskId;
+this.queuesToFlush = new 
ArrayList(Collections.nCopies(localReceiveQueues.size(), null) );
 }
 
-public void transfer(int task, Tuple tuple) {
-AddressedTuple val = new AddressedTuple(task, tuple);
+// adds addressedTuple to destination Q if it is not full. else adds 
to pendingEmits (if its not null)
+public boolean tryTransfer(AddressedTuple addressedTuple, 
Queue pendingEmits) {
 if (isDebug) {
-LOG.info("TRANSFERRING tuple {}", val);
+LOG.info("TRANSFERRING tuple {}", addressedTuple);
+}
+
+JCQueue localQueue = getLocalQueue(addressedTuple);
+if (localQueue!=null) {
+return tryTransferLocal(addressedTuple, localQueue, 
pendingEmits);
+}  else  {
+if (remotesBatchSz >= producerBatchSz) {
+if ( !workerData.tryFlushRemotes() ) {
+if (pendingEmits != null) {
+pendingEmits.add(addressedTuple);
+}
+return false;
+}
+remotesBatchSz = 0;
--- End diff --

@revans2 
There is one instance of the ExecutorTransfer object per Executor. So the 
only way to invoke this concurrently is as follows:
- background threads are spun up by the spout/bolt executor and 
- the outputcollector.emit() was called without any external 
synchronization.

is that the situation you were thinking ?



---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-30 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r164955129
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.daemon.worker;
+
+import org.apache.storm.messaging.netty.BackPressureStatus;
+import org.apache.storm.utils.JCQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.storm.Constants.SYSTEM_TASK_ID;
+
+public class BackPressureTracker {
+static final Logger LOG = 
LoggerFactory.getLogger(BackPressureTracker.class);
+
+private final Map bpTasks = new 
ConcurrentHashMap<>(); // updates are more frequent than iteration
+private final Set nonBpTasks = ConcurrentHashMap.newKeySet();
+private final String workerId;
+
+public BackPressureTracker(String workerId, List 
allLocalTasks) {
+this.workerId = workerId;
+this.nonBpTasks.addAll(allLocalTasks);// all tasks are 
considered to be not under BP initially
+this.nonBpTasks.remove((int)SYSTEM_TASK_ID);   // not tracking 
system task
+}
+
+/* called by transferLocalBatch() on NettyWorker thread
+ * returns true if an update was recorded, false if taskId is already 
under BP
+ */
+public boolean recordBackpressure(Integer taskId, JCQueue recvQ) {
+if (nonBpTasks.remove(taskId)) {
+bpTasks.put(taskId, recvQ);
+return true;
+}
+return false;
+}
+
+// returns true if there was a change in the BP situation
+public boolean refreshBpTaskList() {
+boolean changed = false;
+LOG.debug("Running Back Pressure status change check");
+for (Iterator> itr = 
bpTasks.entrySet().iterator(); itr.hasNext(); ) {
+Entry entry = itr.next();
+if (entry.getValue().isEmptyOverflow()) {
+// move task from bpTasks to noBpTasks
+nonBpTasks.add(entry.getKey());
+itr.remove();
--- End diff --

Next update will simplify the logic by using a single map and fix this 
issue as well.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-25 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r163967849
  
--- Diff: storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java 
---
@@ -47,7 +47,6 @@
 public static final String STORMS_ROOT = "storms";
 public static final String SUPERVISORS_ROOT = "supervisors";
 public static final String WORKERBEATS_ROOT = "workerbeats";
-public static final String BACKPRESSURE_ROOT = "backpressure";
--- End diff --

@revans2  can u you clarify ? 2.x will require topo jars to be rebuilt for 
it.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-12 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r161361250
  
--- Diff: storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java ---
@@ -76,6 +76,32 @@ public static Integer getInt(Object o, Integer 
defaultValue) {
 throw new IllegalArgumentException("Don't know how to convert " + 
o + " to int");
 }
 
+public static Long getLong(Object o) {
+return getLong(o, null);
+}
+
+public static Long getLong(Object o, Long defaultValue) {
+if (null == o) {
+return defaultValue;
+}
+
+if ( o instanceof Long ||
+o instanceof Integer ||
+o instanceof Short ||
+o instanceof Byte) {
+return ((Number) o).longValue();
+} else if (o instanceof Double) {
+final long l = (Long) o;
--- End diff --

yes my bad.  I think its ok to throw exception for numbers larger than 
Long. 
I dont see the bug in getInt


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-12 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r161361184
  
--- Diff: storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java ---
@@ -24,50 +24,46 @@
 import org.apache.storm.task.GeneralTopologyContext;
 
 public class TupleImpl implements Tuple {
-private final List values;
-private final int taskId;
-private final String streamId;
-private final GeneralTopologyContext context;
-private final MessageId id;
+private List values;
+private int taskId;
+private String streamId;
+private GeneralTopologyContext context;
+private MessageId id;
+private final String srcComponent;
 private Long _processSampleStartTime;
 private Long _executeSampleStartTime;
 private long _outAckVal = 0;
-
+
 public TupleImpl(Tuple t) {
 this.values = t.getValues();
 this.taskId = t.getSourceTask();
 this.streamId = t.getSourceStreamId();
 this.id = t.getMessageId();
 this.context = t.getContext();
-if (t instanceof TupleImpl) {
+this.srcComponent = t.getSourceComponent();
+try {
 TupleImpl ti = (TupleImpl) t;
 this._processSampleStartTime = ti._processSampleStartTime;
 this._executeSampleStartTime = ti._executeSampleStartTime;
 this._outAckVal = ti._outAckVal;
+} catch (ClassCastException e) {
+// ignore ... if t is not a TupleImpl type .. faster than 
checking and then casting
 }
 }
 
-public TupleImpl(GeneralTopologyContext context, List values, 
int taskId, String streamId, MessageId id) {
+public TupleImpl(GeneralTopologyContext context, List values, 
String srcComponent, int taskId, String streamId, MessageId id) {
 this.values = Collections.unmodifiableList(values);
 this.taskId = taskId;
 this.streamId = streamId;
 this.id = id;
 this.context = context;
-
-String componentId = context.getComponentId(taskId);
-Fields schema = context.getComponentOutputFields(componentId, 
streamId);
-if(values.size()!=schema.size()) {
-throw new IllegalArgumentException(
-"Tuple created with wrong number of fields. " +
-"Expected " + schema.size() + " fields but got " +
-values.size() + " fields");
-}
--- End diff --

Ok, enabling it for local mode. Doing my best to eliminate map lookups in 
critical path. 


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-12 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r161359459
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java 
---
@@ -20,6 +20,7 @@
 import org.apache.storm.Config;
 import org.apache.storm.generated.ComponentCommon;
 import org.apache.storm.generated.StormTopology;
+import org.apache.storm.messaging.netty.BackPressureStatus;
--- End diff --

fixed.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-12 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r161359446
  
--- Diff: storm-client/src/jvm/org/apache/storm/executor/TupleInfo.java ---
@@ -23,7 +23,7 @@
 import java.io.Serializable;
 import java.util.List;
 
-public class TupleInfo implements Serializable {
+public final class TupleInfo implements Serializable {
--- End diff --

I think i was wondering if marking some of the frequently allocated and 
used objects as final impacts perf. Not necessary, I will revert it.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-12 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r161358991
  
--- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -225,51 +228,62 @@ private static String 
getExecutorType(WorkerTopologyContext workerTopologyContex
 }
 }
 
+public Queue getPendingEmits() {
+return pendingEmits;
+}
+
 /**
  * separated from mkExecutor in order to replace executor transfer in 
executor data for testing
  */
 public ExecutorShutdown execute() throws Exception {
 LOG.info("Loading executor tasks " + componentId + ":" + 
executorId);
 
-registerBackpressure();
-Utils.SmartThread systemThreads =
-Utils.asyncLoop(executorTransfer, 
executorTransfer.getName(), reportErrorDie);
-
 String handlerName = componentId + "-executor" + executorId;
-Utils.SmartThread handlers =
+Utils.SmartThread handler =
 Utils.asyncLoop(this, false, reportErrorDie, 
Thread.NORM_PRIORITY, true, true, handlerName);
 setupTicks(StatsUtil.SPOUT.equals(type));
 
 LOG.info("Finished loading executor " + componentId + ":" + 
executorId);
-return new ExecutorShutdown(this, 
Lists.newArrayList(systemThreads, handlers), idToTask);
+return new ExecutorShutdown(this, Lists.newArrayList(handler), 
idToTask);
 }
 
 public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws 
Exception;
 
-@SuppressWarnings("unchecked")
 @Override
-public void onEvent(Object event, long seq, boolean endOfBatch) throws 
Exception {
-ArrayList addressedTuples = 
(ArrayList) event;
-for (AddressedTuple addressedTuple : addressedTuples) {
-TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
-int taskId = addressedTuple.getDest();
-if (isDebug) {
-LOG.info("Processing received message FOR {} TUPLE: {}", 
taskId, tuple);
-}
+public void accept(Object event) {
+if (event == JCQueue.INTERRUPT) {
+throw new RuntimeException(new InterruptedException("JCQ 
processing interrupted") );
+}
+AddressedTuple addressedTuple =  (AddressedTuple)event;
+int taskId = addressedTuple.getDest();
+
+TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
+if (isDebug) {
+LOG.info("Processing received message FOR {} TUPLE: {}", 
taskId, tuple);
+}
+
+try {
 if (taskId != AddressedTuple.BROADCAST_DEST) {
 tupleActionFn(taskId, tuple);
 } else {
 for (Integer t : taskIds) {
 tupleActionFn(t, tuple);
 }
 }
+} catch (Exception e) {
+throw new RuntimeException(e);
--- End diff --


[Utils.asyncLoop()](https://github.com/roshannaik/storm/blob/STORM-2306-2/storm-client/src/jvm/org/apache/storm/utils/Utils.java#L351)
  of the  BoltExecutor or SpoutExecutor thread


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-12 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r161358833
  
--- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -225,51 +228,62 @@ private static String 
getExecutorType(WorkerTopologyContext workerTopologyContex
 }
 }
 
+public Queue getPendingEmits() {
+return pendingEmits;
+}
+
 /**
  * separated from mkExecutor in order to replace executor transfer in 
executor data for testing
  */
 public ExecutorShutdown execute() throws Exception {
 LOG.info("Loading executor tasks " + componentId + ":" + 
executorId);
 
-registerBackpressure();
-Utils.SmartThread systemThreads =
-Utils.asyncLoop(executorTransfer, 
executorTransfer.getName(), reportErrorDie);
-
 String handlerName = componentId + "-executor" + executorId;
-Utils.SmartThread handlers =
+Utils.SmartThread handler =
 Utils.asyncLoop(this, false, reportErrorDie, 
Thread.NORM_PRIORITY, true, true, handlerName);
 setupTicks(StatsUtil.SPOUT.equals(type));
 
 LOG.info("Finished loading executor " + componentId + ":" + 
executorId);
-return new ExecutorShutdown(this, 
Lists.newArrayList(systemThreads, handlers), idToTask);
+return new ExecutorShutdown(this, Lists.newArrayList(handler), 
idToTask);
 }
 
 public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws 
Exception;
 
-@SuppressWarnings("unchecked")
 @Override
-public void onEvent(Object event, long seq, boolean endOfBatch) throws 
Exception {
-ArrayList addressedTuples = 
(ArrayList) event;
-for (AddressedTuple addressedTuple : addressedTuples) {
-TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
-int taskId = addressedTuple.getDest();
-if (isDebug) {
-LOG.info("Processing received message FOR {} TUPLE: {}", 
taskId, tuple);
-}
+public void accept(Object event) {
+if (event == JCQueue.INTERRUPT) {
+throw new RuntimeException(new InterruptedException("JCQ 
processing interrupted") );
--- End diff --

That exception bubbles up to and gets caught by the Utils.asyncLoop in the 
thread that calls the recvQueue.consume() [i.e. a BoltExecutor or SpoutExecutor 
thread]


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-12 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r161358582
  
--- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -196,19 +197,21 @@ public static Executor mkExecutor(WorkerState 
workerState, List executorId
 executor.stats = new 
BoltExecutorStats(ConfigUtils.samplingRate(executor.getStormConf()),ObjectReader.getInt(executor.getStormConf().get(Config.NUM_STAT_BUCKETS)));
 }
 
+int minId = Integer.MAX_VALUE;
 Map idToTask = new HashMap<>();
 for (Integer taskId : taskIds) {
+minId = Math.min(minId, taskId);
 try {
 Task task = new Task(executor, taskId);
-executor.sendUnanchored(
-task, StormCommon.SYSTEM_STREAM_ID, new 
Values("startup"), executor.getExecutorTransfer());
+task.sendUnanchored( StormCommon.SYSTEM_STREAM_ID, new 
Values("startup"), executor.getExecutorTransfer(), null); // TODO: Roshan: does 
this get delivered/handled anywhere ?
--- End diff --

I don't care too much, will retain if it has some use.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-12 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r161358493
  
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java 
---
@@ -155,134 +150,159 @@ public void start() throws Exception {
 
 Subject.doAs(subject, new PrivilegedExceptionAction() {
 @Override public Object run() throws Exception {
-workerState =
-new WorkerState(conf, context, topologyId, 
assignmentId, port, workerId, topologyConf, stateStorage,
+return loadWorker(topologyConf, stateStorage, 
stormClusterState, initCreds, initialCredentials);
+}
+}); // Subject.doAs(...)
+
+}
+
+private Object loadWorker(Map topologyConf, 
IStateStorage stateStorage, IStormClusterState stormClusterState, Map initCreds, Credentials initialCredentials)
+throws Exception {
+workerState =
+new WorkerState(conf, context, topologyId, assignmentId, 
port, workerId, topologyConf, stateStorage,
 stormClusterState);
 
-// Heartbeat here so that worker process dies if this fails
-// it's important that worker heartbeat to supervisor ASAP 
so that supervisor knows
-// that worker is running and moves on
-doHeartBeat();
+// Heartbeat here so that worker process dies if this fails
+// it's important that worker heartbeat to supervisor ASAP so that 
supervisor knows
+// that worker is running and moves on
+doHeartBeat();
 
-executorsAtom = new AtomicReference<>(null);
+executorsAtom = new AtomicReference<>(null);
 
-// launch heartbeat threads immediately so that 
slow-loading tasks don't cause the worker to timeout
-// to the supervisor
-workerState.heartbeatTimer
-.scheduleRecurring(0, (Integer) 
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
-try {
-doHeartBeat();
-} catch (IOException e) {
-throw new RuntimeException(e);
-}
-});
+// launch heartbeat threads immediately so that slow-loading tasks 
don't cause the worker to timeout
+// to the supervisor
+workerState.heartbeatTimer
+.scheduleRecurring(0, (Integer) 
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
+try {
+doHeartBeat();
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+});
 
-workerState.executorHeartbeatTimer
-.scheduleRecurring(0, (Integer) 
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
+workerState.executorHeartbeatTimer
+.scheduleRecurring(0, (Integer) 
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
 Worker.this::doExecutorHeartbeats);
 
-workerState.registerCallbacks();
+workerState.registerCallbacks();
 
-workerState.refreshConnections(null);
+workerState.refreshConnections(null);
 
-workerState.activateWorkerWhenAllConnectionsReady();
+workerState.activateWorkerWhenAllConnectionsReady();
 
-workerState.refreshStormActive(null);
+workerState.refreshStormActive(null);
 
-workerState.runWorkerStartHooks();
+workerState.runWorkerStartHooks();
 
-List newExecutors = new 
ArrayList();
-for (List e : workerState.getExecutors()) {
-if (ConfigUtils.isLocalMode(topologyConf)) {
-newExecutors.add(
-LocalExecutor.mkExecutor(workerState, e, 
initCreds)
-.execute());
-} else {
-newExecutors.add(
-Executor.mkExecutor(workerState, e, initCreds)
-.execute());
-}
-}
-executorsAtom.set(newExecutors);
+List execs = new ArrayList<>();
+for (List e : workerState.getExecutors()) {
+if (ConfigUtils.isLocalMode(topologyConf)) {
+Executor executor = LocalExecutor.mkExecutor(workerState, 
e, initCreds);
+execs.add( 

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-12 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r161358387
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java
 ---
@@ -22,24 +22,25 @@
 
 public class SpoutThrottlingMetrics extends BuiltinMetrics {
 private final CountMetric skippedMaxSpoutMs = new CountMetric();
-private final CountMetric skippedThrottleMs = new CountMetric();
 private final CountMetric skippedInactiveMs = new CountMetric();
+private final CountMetric skippedBackPressureMs = new CountMetric();
 
 public SpoutThrottlingMetrics() {
 metricMap.put("skipped-max-spout-ms", skippedMaxSpoutMs);
-metricMap.put("skipped-throttle-ms", skippedThrottleMs);
 metricMap.put("skipped-inactive-ms", skippedInactiveMs);
+metricMap.put("skipped-backpressure-ms", skippedBackPressureMs);
--- End diff --

fixed


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-12 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r161357014
  
--- Diff: storm-client/src/jvm/org/apache/storm/StormTimer.java ---
@@ -193,6 +210,24 @@ public void run() {
 });
 }
 
+/**
+ * Schedule a function to run recurrently
+ * @param delayMs the number of millis to delay before running the 
function
+ * @param recurMs the time between each invocation
+ * @param func the function to run
+ */
+public void scheduleRecurringMs(long delayMs, final long recurMs, 
final Runnable func) {
+scheduleMs(delayMs, new Runnable() {
--- End diff --

Doesn't seem feasible due to the 'this' reference inside the lambda.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-12 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r161356532
  
--- Diff: docs/Performance.md ---
@@ -0,0 +1,132 @@
+---
--- End diff --

I am putting a link to this doc from Concepts.md. If you have better place 
in mind please let me know.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r160066094
  
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/Task.java ---
@@ -177,6 +195,35 @@ public BuiltinMetrics getBuiltInMetrics() {
 return builtInMetrics;
 }
 
+
+// Non Blocking call. If cannot emit to destination immediately, such 
tuples will be added to `pendingEmits` argument
+public void sendUnanchored(String stream, List values, 
ExecutorTransfer transfer, Queue pendingEmits) {
+Tuple tuple = getTuple(stream, values);
+List tasks = getOutgoingTasks(stream, values);
+for (Integer t : tasks) {
+AddressedTuple addressedTuple = new AddressedTuple(t, tuple);
+transfer.tryTransfer(addressedTuple, pendingEmits);
+}
+}
+
+/**
+ * Send sampled data to the eventlogger if the global or component 
level debug flag is set (via nimbus api).
+ */
+public void sendToEventLogger(Executor executor, List values,
--- End diff --

I reread the code and found sampling percentage can be changed. I was 
thinking about reducing random.nextDouble(), but in this case we may not be 
able to do that. Please ignore my previous comment.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r160065976
  
--- Diff: storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java ---
@@ -24,50 +24,46 @@
 import org.apache.storm.task.GeneralTopologyContext;
 
 public class TupleImpl implements Tuple {
-private final List values;
-private final int taskId;
-private final String streamId;
-private final GeneralTopologyContext context;
-private final MessageId id;
+private List values;
+private int taskId;
+private String streamId;
+private GeneralTopologyContext context;
+private MessageId id;
+private final String srcComponent;
 private Long _processSampleStartTime;
 private Long _executeSampleStartTime;
 private long _outAckVal = 0;
-
+
 public TupleImpl(Tuple t) {
 this.values = t.getValues();
 this.taskId = t.getSourceTask();
 this.streamId = t.getSourceStreamId();
 this.id = t.getMessageId();
 this.context = t.getContext();
-if (t instanceof TupleImpl) {
+this.srcComponent = t.getSourceComponent();
+try {
 TupleImpl ti = (TupleImpl) t;
 this._processSampleStartTime = ti._processSampleStartTime;
 this._executeSampleStartTime = ti._executeSampleStartTime;
 this._outAckVal = ti._outAckVal;
+} catch (ClassCastException e) {
+// ignore ... if t is not a TupleImpl type .. faster than 
checking and then casting
 }
 }
 
-public TupleImpl(GeneralTopologyContext context, List values, 
int taskId, String streamId, MessageId id) {
+public TupleImpl(GeneralTopologyContext context, List values, 
String srcComponent, int taskId, String streamId, MessageId id) {
 this.values = Collections.unmodifiableList(values);
 this.taskId = taskId;
 this.streamId = streamId;
 this.id = id;
 this.context = context;
-
-String componentId = context.getComponentId(taskId);
-Fields schema = context.getComponentOutputFields(componentId, 
streamId);
-if(values.size()!=schema.size()) {
-throw new IllegalArgumentException(
-"Tuple created with wrong number of fields. " +
-"Expected " + schema.size() + " fields but got " +
-values.size() + " fields");
-}
--- End diff --

I think having a map associating pair of taskId, streamId to schema's size 
may help reducing the slowdown. I know we are going to avoid map lookup, but it 
looks like acceptable tradeoff to me if we still want to leave the sanity check.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-05 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r160011014
  
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/Task.java ---
@@ -177,6 +195,35 @@ public BuiltinMetrics getBuiltInMetrics() {
 return builtInMetrics;
 }
 
+
+// Non Blocking call. If cannot emit to destination immediately, such 
tuples will be added to `pendingEmits` argument
+public void sendUnanchored(String stream, List values, 
ExecutorTransfer transfer, Queue pendingEmits) {
+Tuple tuple = getTuple(stream, values);
+List tasks = getOutgoingTasks(stream, values);
+for (Integer t : tasks) {
+AddressedTuple addressedTuple = new AddressedTuple(t, tuple);
+transfer.tryTransfer(addressedTuple, pendingEmits);
+}
+}
+
+/**
+ * Send sampled data to the eventlogger if the global or component 
level debug flag is set (via nimbus api).
+ */
+public void sendToEventLogger(Executor executor, List values,
--- End diff --

@HeartSaVioR you pointed out some optimizations are possible to this .. 
that we can tackle in another jira ... can you elaborate or capture your 
thoughts into a jira ?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-05 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r160010894
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
 ---
@@ -105,19 +134,16 @@ public void reportError(Throwable error) {
 msgId = MessageId.makeUnanchored();
 }
 
-TupleImpl tuple = new 
TupleImpl(executor.getWorkerTopologyContext(), values, this.taskId, stream, 
msgId);
-executor.getExecutorTransfer().transfer(t, tuple);
+final TupleImpl tuple = new 
TupleImpl(executor.getWorkerTopologyContext(), values, 
executor.getComponentId(), this.taskId, stream, msgId);
+AddressedTuple adrTuple = new AddressedTuple(t, tuple);
+executor.getExecutorTransfer().tryTransfer(adrTuple, 
executor.getPendingEmits());
 }
 if (isEventLoggers) {
-executor.sendToEventLogger(executor, taskData, values, 
executor.getComponentId(), messageId, random);
+taskData.sendToEventLogger(executor, values, 
executor.getComponentId(), messageId, random, executor.getPendingEmits());
 }
--- End diff --

@HeartSaVioR you pointed out some optimizations are possible to this .. 
that we can tackle in another jira ... can you elaborate or capture your 
thoughts into a jira ?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-05 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r159969475
  
--- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -328,20 +328,22 @@ public static boolean isSystemId(String id) {
  * @return the newly created thread
  * @see Thread
  */
-public static SmartThread asyncLoop(final Callable afn,
-boolean isDaemon, final Thread.UncaughtExceptionHandler eh,
-int priority, final boolean isFactory, boolean 
startImmediately,
-String threadName) {
+public static SmartThread asyncLoop(final Callable afn, boolean 
isDaemon, final Thread.UncaughtExceptionHandler eh,
+int priority, final boolean 
isFactory, boolean startImmediately,
+String threadName) {
 SmartThread thread = new SmartThread(new Runnable() {
 public void run() {
-Object s;
 try {
-Callable fn = isFactory ? (Callable) afn.call() : afn;
-while ((s = fn.call()) instanceof Long) {
-Time.sleepSecs((Long) s);
+final Callable fn = isFactory ? (Callable) 
afn.call() : afn;
+while (true) {
+final Long s = fn.call();
+if (s==null) // then stop running it
+break;
+if (s>0)
+Thread.sleep(s);
--- End diff --

This needs to be Time.sleep if we want simulated time to work properly


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-05 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r159962361
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java ---
@@ -17,72 +17,124 @@
  */
 package org.apache.storm.executor;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.lmax.disruptor.EventHandler;
 import org.apache.storm.Config;
 import org.apache.storm.daemon.worker.WorkerState;
 import org.apache.storm.serialization.KryoTupleSerializer;
 import org.apache.storm.tuple.AddressedTuple;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.DisruptorQueue;
-import org.apache.storm.utils.MutableObject;
+import org.apache.storm.utils.JCQueue;
 import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.Queue;
 
-public class ExecutorTransfer implements EventHandler, Callable {
+// Every executor has an instance of this class
+public class ExecutorTransfer  {
 private static final Logger LOG = 
LoggerFactory.getLogger(ExecutorTransfer.class);
 
 private final WorkerState workerData;
-private final DisruptorQueue batchTransferQueue;
-private final Map topoConf;
 private final KryoTupleSerializer serializer;
-private final MutableObject cachedEmit;
 private final boolean isDebug;
+private final int producerBatchSz;
+private int remotesBatchSz = 0;
+private int indexingBase = 0;
+private ArrayList localReceiveQueues; // 
[taskId-indexingBase] => queue : List of all recvQs local to this worker
+private ArrayList queuesToFlush; // [taskId-indexingBase] => 
queue, some entries can be null. : outbound Qs for this executor instance
 
-public ExecutorTransfer(WorkerState workerData, DisruptorQueue 
batchTransferQueue, Map topoConf) {
+
+public ExecutorTransfer(WorkerState workerData, Map 
topoConf) {
 this.workerData = workerData;
-this.batchTransferQueue = batchTransferQueue;
-this.topoConf = topoConf;
 this.serializer = new KryoTupleSerializer(topoConf, 
workerData.getWorkerTopologyContext());
-this.cachedEmit = new MutableObject(new ArrayList<>());
 this.isDebug = 
ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_DEBUG), false);
+this.producerBatchSz = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
+}
+
+// to be called after all Executor objects in the worker are created 
and before this object is used
+public void initLocalRecvQueues() {
+Integer minTaskId = 
workerData.getLocalReceiveQueues().keySet().stream().min(Integer::compareTo).get();
+this.localReceiveQueues = Utils.convertToArray( 
workerData.getLocalReceiveQueues(), minTaskId);
+this.indexingBase = minTaskId;
+this.queuesToFlush = new 
ArrayList(Collections.nCopies(localReceiveQueues.size(), null) );
 }
 
-public void transfer(int task, Tuple tuple) {
-AddressedTuple val = new AddressedTuple(task, tuple);
+// adds addressedTuple to destination Q if it is not full. else adds 
to pendingEmits (if its not null)
+public boolean tryTransfer(AddressedTuple addressedTuple, 
Queue pendingEmits) {
 if (isDebug) {
-LOG.info("TRANSFERRING tuple {}", val);
+LOG.info("TRANSFERRING tuple {}", addressedTuple);
+}
+
+JCQueue localQueue = getLocalQueue(addressedTuple);
+if (localQueue!=null) {
+return tryTransferLocal(addressedTuple, localQueue, 
pendingEmits);
+}  else  {
+if (remotesBatchSz >= producerBatchSz) {
+if ( !workerData.tryFlushRemotes() ) {
+if (pendingEmits != null) {
+pendingEmits.add(addressedTuple);
+}
+return false;
+}
+remotesBatchSz = 0;
--- End diff --

Do we have a race condition here?  I believe that this method can be called 
from multiple different threads, and if so then we now have to worry about 
remotesBatchSz staying consistent.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-05 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r159969170
  
--- Diff: storm-client/src/jvm/org/apache/storm/utils/JCQueue.java ---
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.utils;
+
+import org.apache.storm.policy.IWaitStrategy;
+import org.apache.storm.metric.api.IStatefulObject;
+import org.apache.storm.metric.internal.RateTracker;
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.MpscArrayQueue;
+import org.jctools.queues.MpscUnboundedArrayQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public final class JCQueue implements IStatefulObject {
+private static final Logger LOG = 
LoggerFactory.getLogger(JCQueue.class);
+
+public static final Object INTERRUPT = new Object();
+
+private final ThroughputMeter emptyMeter = new 
ThroughputMeter("EmptyBatch");
--- End diff --

This is never reported anywhere...  Do we want to just delete it and move 
ThroughputMeter to perf?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-05 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r159956952
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.daemon.worker;
+
+import org.apache.storm.messaging.netty.BackPressureStatus;
+import org.apache.storm.utils.JCQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.storm.Constants.SYSTEM_TASK_ID;
+
+public class BackPressureTracker {
+static final Logger LOG = 
LoggerFactory.getLogger(BackPressureTracker.class);
+
+private final Map bpTasks = new 
ConcurrentHashMap<>(); // updates are more frequent than iteration
+private final Set nonBpTasks = ConcurrentHashMap.newKeySet();
+private final String workerId;
+
+public BackPressureTracker(String workerId, List 
allLocalTasks) {
+this.workerId = workerId;
+this.nonBpTasks.addAll(allLocalTasks);// all tasks are 
considered to be not under BP initially
+this.nonBpTasks.remove((int)SYSTEM_TASK_ID);   // not tracking 
system task
+}
+
+/* called by transferLocalBatch() on NettyWorker thread
+ * returns true if an update was recorded, false if taskId is already 
under BP
+ */
+public boolean recordBackpressure(Integer taskId, JCQueue recvQ) {
+if (nonBpTasks.remove(taskId)) {
+bpTasks.put(taskId, recvQ);
+return true;
+}
+return false;
+}
+
+// returns true if there was a change in the BP situation
+public boolean refreshBpTaskList() {
+boolean changed = false;
+LOG.debug("Running Back Pressure status change check");
+for (Iterator> itr = 
bpTasks.entrySet().iterator(); itr.hasNext(); ) {
+Entry entry = itr.next();
+if (entry.getValue().isEmptyOverflow()) {
+// move task from bpTasks to noBpTasks
+nonBpTasks.add(entry.getKey());
+itr.remove();
--- End diff --

It looks like there is a race condition here with `recordBackpressure`.  If 
`nonBpTasks.add(entry.getKey())` finishes and then we get a context switch and 
`recordBackpressure` completes fully for the same taskId then itr.remove 
happens we might have an inconsistent state.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-05 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r159968476
  
--- Diff: storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java ---
@@ -76,6 +76,32 @@ public static Integer getInt(Object o, Integer 
defaultValue) {
 throw new IllegalArgumentException("Don't know how to convert " + 
o + " to int");
 }
 
+public static Long getLong(Object o) {
+return getLong(o, null);
+}
+
+public static Long getLong(Object o, Long defaultValue) {
+if (null == o) {
+return defaultValue;
+}
+
+if ( o instanceof Long ||
+o instanceof Integer ||
+o instanceof Short ||
+o instanceof Byte) {
+return ((Number) o).longValue();
+} else if (o instanceof Double) {
+final long l = (Long) o;
--- End diff --

This is not going to work here as a Long can never be larger then MAX_VALUE 
for a long or smaller then MIN_VALUE for a Long. Technically I think there may 
be a bug in getInt if the number is larger then a Long can hold.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-05 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r159969789
  
--- Diff: storm-server/src/main/java/org/apache/storm/Testing.java ---
@@ -712,6 +712,6 @@ public static Tuple testTuple(List values, 
MkTupleParam param) {
 new HashMap<>(),
 new HashMap<>(),
 new AtomicBoolean(false));
-return new TupleImpl(context, values, 1, stream);
+return new TupleImpl(context, values, "component", 1, stream);
--- End diff --

This I think is the cause of some of the test failures


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-05 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r159954152
  
--- Diff: storm-client/src/jvm/org/apache/storm/StormTimer.java ---
@@ -193,6 +210,24 @@ public void run() {
 });
 }
 
+/**
+ * Schedule a function to run recurrently
+ * @param delayMs the number of millis to delay before running the 
function
+ * @param recurMs the time between each invocation
+ * @param func the function to run
+ */
+public void scheduleRecurringMs(long delayMs, final long recurMs, 
final Runnable func) {
+scheduleMs(delayMs, new Runnable() {
--- End diff --

nit could we use a java 8 lambda here instead?

```
scheduleMs(delayMs, () -> {
func.run();
// This avoids a race condition with cancel-timer.
scheduleMs(recurMs, this, true, 0);
});
```


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-05 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r159957697
  
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java 
---
@@ -155,134 +150,159 @@ public void start() throws Exception {
 
 Subject.doAs(subject, new PrivilegedExceptionAction() {
 @Override public Object run() throws Exception {
-workerState =
-new WorkerState(conf, context, topologyId, 
assignmentId, port, workerId, topologyConf, stateStorage,
+return loadWorker(topologyConf, stateStorage, 
stormClusterState, initCreds, initialCredentials);
+}
+}); // Subject.doAs(...)
+
+}
+
+private Object loadWorker(Map topologyConf, 
IStateStorage stateStorage, IStormClusterState stormClusterState, Map initCreds, Credentials initialCredentials)
+throws Exception {
+workerState =
+new WorkerState(conf, context, topologyId, assignmentId, 
port, workerId, topologyConf, stateStorage,
 stormClusterState);
 
-// Heartbeat here so that worker process dies if this fails
-// it's important that worker heartbeat to supervisor ASAP 
so that supervisor knows
-// that worker is running and moves on
-doHeartBeat();
+// Heartbeat here so that worker process dies if this fails
+// it's important that worker heartbeat to supervisor ASAP so that 
supervisor knows
+// that worker is running and moves on
+doHeartBeat();
 
-executorsAtom = new AtomicReference<>(null);
+executorsAtom = new AtomicReference<>(null);
 
-// launch heartbeat threads immediately so that 
slow-loading tasks don't cause the worker to timeout
-// to the supervisor
-workerState.heartbeatTimer
-.scheduleRecurring(0, (Integer) 
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
-try {
-doHeartBeat();
-} catch (IOException e) {
-throw new RuntimeException(e);
-}
-});
+// launch heartbeat threads immediately so that slow-loading tasks 
don't cause the worker to timeout
+// to the supervisor
+workerState.heartbeatTimer
+.scheduleRecurring(0, (Integer) 
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
+try {
+doHeartBeat();
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+});
 
-workerState.executorHeartbeatTimer
-.scheduleRecurring(0, (Integer) 
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
+workerState.executorHeartbeatTimer
+.scheduleRecurring(0, (Integer) 
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
 Worker.this::doExecutorHeartbeats);
 
-workerState.registerCallbacks();
+workerState.registerCallbacks();
 
-workerState.refreshConnections(null);
+workerState.refreshConnections(null);
 
-workerState.activateWorkerWhenAllConnectionsReady();
+workerState.activateWorkerWhenAllConnectionsReady();
 
-workerState.refreshStormActive(null);
+workerState.refreshStormActive(null);
 
-workerState.runWorkerStartHooks();
+workerState.runWorkerStartHooks();
 
-List newExecutors = new 
ArrayList();
-for (List e : workerState.getExecutors()) {
-if (ConfigUtils.isLocalMode(topologyConf)) {
-newExecutors.add(
-LocalExecutor.mkExecutor(workerState, e, 
initCreds)
-.execute());
-} else {
-newExecutors.add(
-Executor.mkExecutor(workerState, e, initCreds)
-.execute());
-}
-}
-executorsAtom.set(newExecutors);
+List execs = new ArrayList<>();
+for (List e : workerState.getExecutors()) {
+if (ConfigUtils.isLocalMode(topologyConf)) {
+Executor executor = LocalExecutor.mkExecutor(workerState, 
e, initCreds);
+execs.add( 

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-05 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r159969910
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
 ---
@@ -79,6 +79,7 @@
 protected final long lowMemoryThresholdMB;
 protected final long mediumMemoryThresholdMb;
 protected final long mediumMemoryGracePeriodMs;
+private static int port = 5006;  // TODO: Roshan: remove this after 
stabilization
--- End diff --

Yes please make sure this is removed, as leaving it in place is a security 
vulnerability.
  


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-05 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r159954450
  
--- Diff: storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java 
---
@@ -47,7 +47,6 @@
 public static final String STORMS_ROOT = "storms";
 public static final String SUPERVISORS_ROOT = "supervisors";
 public static final String WORKERBEATS_ROOT = "workerbeats";
-public static final String BACKPRESSURE_ROOT = "backpressure";
--- End diff --

In order to support running older topology versions under a newer 2.x 
nimbus/etc we should still keep around the basic setup and cleanup of the 
backpressure nodes in zookeeper at least until a 3.x release.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-05 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r159967636
  
--- Diff: storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java ---
@@ -24,50 +24,46 @@
 import org.apache.storm.task.GeneralTopologyContext;
 
 public class TupleImpl implements Tuple {
-private final List values;
-private final int taskId;
-private final String streamId;
-private final GeneralTopologyContext context;
-private final MessageId id;
+private List values;
+private int taskId;
+private String streamId;
+private GeneralTopologyContext context;
+private MessageId id;
+private final String srcComponent;
 private Long _processSampleStartTime;
 private Long _executeSampleStartTime;
 private long _outAckVal = 0;
-
+
 public TupleImpl(Tuple t) {
 this.values = t.getValues();
 this.taskId = t.getSourceTask();
 this.streamId = t.getSourceStreamId();
 this.id = t.getMessageId();
 this.context = t.getContext();
-if (t instanceof TupleImpl) {
+this.srcComponent = t.getSourceComponent();
+try {
 TupleImpl ti = (TupleImpl) t;
 this._processSampleStartTime = ti._processSampleStartTime;
 this._executeSampleStartTime = ti._executeSampleStartTime;
 this._outAckVal = ti._outAckVal;
+} catch (ClassCastException e) {
+// ignore ... if t is not a TupleImpl type .. faster than 
checking and then casting
 }
 }
 
-public TupleImpl(GeneralTopologyContext context, List values, 
int taskId, String streamId, MessageId id) {
+public TupleImpl(GeneralTopologyContext context, List values, 
String srcComponent, int taskId, String streamId, MessageId id) {
 this.values = Collections.unmodifiableList(values);
 this.taskId = taskId;
 this.streamId = streamId;
 this.id = id;
 this.context = context;
-
-String componentId = context.getComponentId(taskId);
-Fields schema = context.getComponentOutputFields(componentId, 
streamId);
-if(values.size()!=schema.size()) {
-throw new IllegalArgumentException(
-"Tuple created with wrong number of fields. " +
-"Expected " + schema.size() + " fields but got " +
-values.size() + " fields");
-}
--- End diff --

I know this slows things down, but it is a good sanity check.  Would it be 
possible to have a way to configure this on?  Like in local mode?  We would 
have to cache it though because just checking the conf might be as long as 
doing this check.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-05 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r159949952
  
--- Diff: docs/Performance.md ---
@@ -0,0 +1,132 @@
+---
--- End diff --

Great Documentation, but can we have some of the other docs link to it?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-05 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r159958378
  
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java 
---
@@ -155,134 +150,159 @@ public void start() throws Exception {
 
 Subject.doAs(subject, new PrivilegedExceptionAction() {
 @Override public Object run() throws Exception {
-workerState =
-new WorkerState(conf, context, topologyId, 
assignmentId, port, workerId, topologyConf, stateStorage,
+return loadWorker(topologyConf, stateStorage, 
stormClusterState, initCreds, initialCredentials);
+}
+}); // Subject.doAs(...)
+
+}
+
+private Object loadWorker(Map topologyConf, 
IStateStorage stateStorage, IStormClusterState stormClusterState, Map initCreds, Credentials initialCredentials)
+throws Exception {
+workerState =
+new WorkerState(conf, context, topologyId, assignmentId, 
port, workerId, topologyConf, stateStorage,
 stormClusterState);
 
-// Heartbeat here so that worker process dies if this fails
-// it's important that worker heartbeat to supervisor ASAP 
so that supervisor knows
-// that worker is running and moves on
-doHeartBeat();
+// Heartbeat here so that worker process dies if this fails
+// it's important that worker heartbeat to supervisor ASAP so that 
supervisor knows
+// that worker is running and moves on
+doHeartBeat();
 
-executorsAtom = new AtomicReference<>(null);
+executorsAtom = new AtomicReference<>(null);
 
-// launch heartbeat threads immediately so that 
slow-loading tasks don't cause the worker to timeout
-// to the supervisor
-workerState.heartbeatTimer
-.scheduleRecurring(0, (Integer) 
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
-try {
-doHeartBeat();
-} catch (IOException e) {
-throw new RuntimeException(e);
-}
-});
+// launch heartbeat threads immediately so that slow-loading tasks 
don't cause the worker to timeout
+// to the supervisor
+workerState.heartbeatTimer
+.scheduleRecurring(0, (Integer) 
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
+try {
+doHeartBeat();
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+});
 
-workerState.executorHeartbeatTimer
-.scheduleRecurring(0, (Integer) 
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
+workerState.executorHeartbeatTimer
+.scheduleRecurring(0, (Integer) 
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
 Worker.this::doExecutorHeartbeats);
 
-workerState.registerCallbacks();
+workerState.registerCallbacks();
 
-workerState.refreshConnections(null);
+workerState.refreshConnections(null);
 
-workerState.activateWorkerWhenAllConnectionsReady();
+workerState.activateWorkerWhenAllConnectionsReady();
 
-workerState.refreshStormActive(null);
+workerState.refreshStormActive(null);
 
-workerState.runWorkerStartHooks();
+workerState.runWorkerStartHooks();
 
-List newExecutors = new 
ArrayList();
-for (List e : workerState.getExecutors()) {
-if (ConfigUtils.isLocalMode(topologyConf)) {
-newExecutors.add(
-LocalExecutor.mkExecutor(workerState, e, 
initCreds)
-.execute());
-} else {
-newExecutors.add(
-Executor.mkExecutor(workerState, e, initCreds)
-.execute());
-}
-}
-executorsAtom.set(newExecutors);
+List execs = new ArrayList<>();
+for (List e : workerState.getExecutors()) {
+if (ConfigUtils.isLocalMode(topologyConf)) {
+Executor executor = LocalExecutor.mkExecutor(workerState, 
e, initCreds);
+execs.add( 

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-05 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r159963018
  
--- Diff: storm-client/src/jvm/org/apache/storm/executor/TupleInfo.java ---
@@ -23,7 +23,7 @@
 import java.io.Serializable;
 import java.util.List;
 
-public class TupleInfo implements Serializable {
+public final class TupleInfo implements Serializable {
--- End diff --

Why do we want this to be final?  Just curious.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-05 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r159955584
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java
 ---
@@ -22,24 +22,25 @@
 
 public class SpoutThrottlingMetrics extends BuiltinMetrics {
 private final CountMetric skippedMaxSpoutMs = new CountMetric();
-private final CountMetric skippedThrottleMs = new CountMetric();
 private final CountMetric skippedInactiveMs = new CountMetric();
+private final CountMetric skippedBackPressureMs = new CountMetric();
 
 public SpoutThrottlingMetrics() {
 metricMap.put("skipped-max-spout-ms", skippedMaxSpoutMs);
-metricMap.put("skipped-throttle-ms", skippedThrottleMs);
 metricMap.put("skipped-inactive-ms", skippedInactiveMs);
+metricMap.put("skipped-backpressure-ms", skippedBackPressureMs);
--- End diff --

./docs/Metrics.md describes these.  It should be updated to remove 
skipped-throttle-ms and replaced with skipped-backpressure-ms (and preferably 
mention that in older versions of storm skipped-throttle-ms would have been 
similar)


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-05 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r159960326
  
--- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -225,51 +228,62 @@ private static String 
getExecutorType(WorkerTopologyContext workerTopologyContex
 }
 }
 
+public Queue getPendingEmits() {
+return pendingEmits;
+}
+
 /**
  * separated from mkExecutor in order to replace executor transfer in 
executor data for testing
  */
 public ExecutorShutdown execute() throws Exception {
 LOG.info("Loading executor tasks " + componentId + ":" + 
executorId);
 
-registerBackpressure();
-Utils.SmartThread systemThreads =
-Utils.asyncLoop(executorTransfer, 
executorTransfer.getName(), reportErrorDie);
-
 String handlerName = componentId + "-executor" + executorId;
-Utils.SmartThread handlers =
+Utils.SmartThread handler =
 Utils.asyncLoop(this, false, reportErrorDie, 
Thread.NORM_PRIORITY, true, true, handlerName);
 setupTicks(StatsUtil.SPOUT.equals(type));
 
 LOG.info("Finished loading executor " + componentId + ":" + 
executorId);
-return new ExecutorShutdown(this, 
Lists.newArrayList(systemThreads, handlers), idToTask);
+return new ExecutorShutdown(this, Lists.newArrayList(handler), 
idToTask);
 }
 
 public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws 
Exception;
 
-@SuppressWarnings("unchecked")
 @Override
-public void onEvent(Object event, long seq, boolean endOfBatch) throws 
Exception {
-ArrayList addressedTuples = 
(ArrayList) event;
-for (AddressedTuple addressedTuple : addressedTuples) {
-TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
-int taskId = addressedTuple.getDest();
-if (isDebug) {
-LOG.info("Processing received message FOR {} TUPLE: {}", 
taskId, tuple);
-}
+public void accept(Object event) {
+if (event == JCQueue.INTERRUPT) {
+throw new RuntimeException(new InterruptedException("JCQ 
processing interrupted") );
--- End diff --

Could you explain a bit more when JCQueue.INTERRUPT happens and where this 
exception is caught/handled?  Almost every other place that we use an 
InterruptedException it is treated as the worker is going down in an orderly 
manor so end the thread without complaining.  I just want to be sure that we 
don't accidentally shut down part of the worker in some odd cases.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-05 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r159960500
  
--- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -225,51 +228,62 @@ private static String 
getExecutorType(WorkerTopologyContext workerTopologyContex
 }
 }
 
+public Queue getPendingEmits() {
+return pendingEmits;
+}
+
 /**
  * separated from mkExecutor in order to replace executor transfer in 
executor data for testing
  */
 public ExecutorShutdown execute() throws Exception {
 LOG.info("Loading executor tasks " + componentId + ":" + 
executorId);
 
-registerBackpressure();
-Utils.SmartThread systemThreads =
-Utils.asyncLoop(executorTransfer, 
executorTransfer.getName(), reportErrorDie);
-
 String handlerName = componentId + "-executor" + executorId;
-Utils.SmartThread handlers =
+Utils.SmartThread handler =
 Utils.asyncLoop(this, false, reportErrorDie, 
Thread.NORM_PRIORITY, true, true, handlerName);
 setupTicks(StatsUtil.SPOUT.equals(type));
 
 LOG.info("Finished loading executor " + componentId + ":" + 
executorId);
-return new ExecutorShutdown(this, 
Lists.newArrayList(systemThreads, handlers), idToTask);
+return new ExecutorShutdown(this, Lists.newArrayList(handler), 
idToTask);
 }
 
 public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws 
Exception;
 
-@SuppressWarnings("unchecked")
 @Override
-public void onEvent(Object event, long seq, boolean endOfBatch) throws 
Exception {
-ArrayList addressedTuples = 
(ArrayList) event;
-for (AddressedTuple addressedTuple : addressedTuples) {
-TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
-int taskId = addressedTuple.getDest();
-if (isDebug) {
-LOG.info("Processing received message FOR {} TUPLE: {}", 
taskId, tuple);
-}
+public void accept(Object event) {
+if (event == JCQueue.INTERRUPT) {
+throw new RuntimeException(new InterruptedException("JCQ 
processing interrupted") );
+}
+AddressedTuple addressedTuple =  (AddressedTuple)event;
+int taskId = addressedTuple.getDest();
+
+TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
+if (isDebug) {
+LOG.info("Processing received message FOR {} TUPLE: {}", 
taskId, tuple);
+}
+
+try {
 if (taskId != AddressedTuple.BROADCAST_DEST) {
 tupleActionFn(taskId, tuple);
 } else {
 for (Integer t : taskIds) {
 tupleActionFn(t, tuple);
 }
 }
+} catch (Exception e) {
+throw new RuntimeException(e);
--- End diff --

Where is this exception handled?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-05 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r159966670
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java 
---
@@ -20,6 +20,7 @@
 import org.apache.storm.Config;
 import org.apache.storm.generated.ComponentCommon;
 import org.apache.storm.generated.StormTopology;
+import org.apache.storm.messaging.netty.BackPressureStatus;
--- End diff --

Is this change needed?  Is this even used here?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-05 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r159959648
  
--- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -196,19 +197,21 @@ public static Executor mkExecutor(WorkerState 
workerState, List executorId
 executor.stats = new 
BoltExecutorStats(ConfigUtils.samplingRate(executor.getStormConf()),ObjectReader.getInt(executor.getStormConf().get(Config.NUM_STAT_BUCKETS)));
 }
 
+int minId = Integer.MAX_VALUE;
 Map idToTask = new HashMap<>();
 for (Integer taskId : taskIds) {
+minId = Math.min(minId, taskId);
 try {
 Task task = new Task(executor, taskId);
-executor.sendUnanchored(
-task, StormCommon.SYSTEM_STREAM_ID, new 
Values("startup"), executor.getExecutorTransfer());
+task.sendUnanchored( StormCommon.SYSTEM_STREAM_ID, new 
Values("startup"), executor.getExecutorTransfer(), null); // TODO: Roshan: does 
this get delivered/handled anywhere ?
--- End diff --

Answer: 
storm-core/test/clj/integration/org/apache/storm/integration_test.clj

It can sometimes be helpful with debugging when you have topology.debug 
enabled, but it is not that critical and can probably be removed if you want 
to.  If not please remove the comment.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-01-05 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r159968772
  
--- Diff: storm-client/src/jvm/org/apache/storm/utils/RunningAvg.java ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.utils;
+
+public class RunningAvg {
+
+private long n = 0;
+private double oldM, newM, oldS, newS;
+private String name;
+public static int printFreq = 20_000_000;
+private boolean disable;
+private long count = 0;
+
+public RunningAvg(String name, boolean disable) {
+this(name, printFreq, disable);
+}
+
+public RunningAvg(String name, int printFreq) {
+this(name, printFreq, false);
+}
+
+public RunningAvg(String name, int printFreq, boolean disable) {
+this.name = name + "_" + Thread.currentThread().getName();
+this.printFreq = printFreq;
+this.disable = disable;
+}
+
+public void clear() {
+n = 0;
+}
+
+public void pushLatency(long startMs) {
+push(System.currentTimeMillis() - startMs);
+}
+
+public void push(long x) {
+if (disable) {
+return;
+}
+
+n++;
+
+if (n == 1) {
+oldM = newM = x;
+oldS = 0;
+} else {
+newM = oldM + (x - oldM) / n;
+newS = oldS + (x - oldM) * (x - newM);
+
+// set up for next iteration
+oldM = newM;
+oldS = newS;
+}
+if (++count == printFreq) {
+System.err.printf("  ***> %s - %,.2f\n", name, mean());
--- End diff --

Can we log this instead of printing it?  Also could you file a follow on 
JIRA so that when we go to a better metrics implementation that we use that 
instead of printing/logging?


---