[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r168057328 --- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java --- @@ -22,19 +22,23 @@ import org.apache.storm.executor.TupleInfo; import org.apache.storm.spout.ISpout; import org.apache.storm.spout.ISpoutOutputCollector; +import org.apache.storm.tuple.AddressedTuple; import org.apache.storm.tuple.MessageId; import org.apache.storm.tuple.TupleImpl; import org.apache.storm.tuple.Values; -import org.apache.storm.utils.Utils; import org.apache.storm.utils.MutableLong; import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.Random; +// Methods are not thread safe. Each thread expected to have a separate instance, or else synchronize externally --- End diff -- Done. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r168046999 --- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java --- @@ -22,19 +22,23 @@ import org.apache.storm.executor.TupleInfo; import org.apache.storm.spout.ISpout; import org.apache.storm.spout.ISpoutOutputCollector; +import org.apache.storm.tuple.AddressedTuple; import org.apache.storm.tuple.MessageId; import org.apache.storm.tuple.TupleImpl; import org.apache.storm.tuple.Values; -import org.apache.storm.utils.Utils; import org.apache.storm.utils.MutableLong; import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.Random; +// Methods are not thread safe. Each thread expected to have a separate instance, or else synchronize externally --- End diff -- yes please thats the intent. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r168044307 --- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java --- @@ -22,19 +22,23 @@ import org.apache.storm.executor.TupleInfo; import org.apache.storm.spout.ISpout; import org.apache.storm.spout.ISpoutOutputCollector; +import org.apache.storm.tuple.AddressedTuple; import org.apache.storm.tuple.MessageId; import org.apache.storm.tuple.TupleImpl; import org.apache.storm.tuple.Values; -import org.apache.storm.utils.Utils; import org.apache.storm.utils.MutableLong; import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.Random; +// Methods are not thread safe. Each thread expected to have a separate instance, or else synchronize externally --- End diff -- OK. Please let me know if you plan to figure out in time frame of Storm 2.0.0. I'll add it in epic of releasing Storm 2.0.0. Thanks! ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r168037754 --- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java --- @@ -22,19 +22,23 @@ import org.apache.storm.executor.TupleInfo; import org.apache.storm.spout.ISpout; import org.apache.storm.spout.ISpoutOutputCollector; +import org.apache.storm.tuple.AddressedTuple; import org.apache.storm.tuple.MessageId; import org.apache.storm.tuple.TupleImpl; import org.apache.storm.tuple.Values; -import org.apache.storm.utils.Utils; import org.apache.storm.utils.MutableLong; import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.Random; +// Methods are not thread safe. Each thread expected to have a separate instance, or else synchronize externally --- End diff -- It is to figure out what will have for Storm 2.0... since we cannot make any breaking changes even if we like to thereafter until 3.0. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r168034925 --- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java --- @@ -22,19 +22,23 @@ import org.apache.storm.executor.TupleInfo; import org.apache.storm.spout.ISpout; import org.apache.storm.spout.ISpoutOutputCollector; +import org.apache.storm.tuple.AddressedTuple; import org.apache.storm.tuple.MessageId; import org.apache.storm.tuple.TupleImpl; import org.apache.storm.tuple.Values; -import org.apache.storm.utils.Utils; import org.apache.storm.utils.MutableLong; import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.Random; +// Methods are not thread safe. Each thread expected to have a separate instance, or else synchronize externally --- End diff -- I thought STORM-2945 was filed to find the way to support background emit without external synchronization, so likely having the chance to keep unresolved in 2.x. If you intended to document how to enable background emit with current state in STORM-2945, please ignore the comment here. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r168033940 --- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java --- @@ -22,19 +22,23 @@ import org.apache.storm.executor.TupleInfo; import org.apache.storm.spout.ISpout; import org.apache.storm.spout.ISpoutOutputCollector; +import org.apache.storm.tuple.AddressedTuple; import org.apache.storm.tuple.MessageId; import org.apache.storm.tuple.TupleImpl; import org.apache.storm.tuple.Values; -import org.apache.storm.utils.Utils; import org.apache.storm.utils.MutableLong; import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.Random; +// Methods are not thread safe. Each thread expected to have a separate instance, or else synchronize externally --- End diff -- To nail down and document the concurrent emits semantics I had opened [STORM-2945](https://issues.apache.org/jira/browse/STORM-2945) ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r168033466 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.messaging.netty; + +import org.apache.storm.serialization.KryoValuesDeserializer; +import org.apache.storm.serialization.KryoValuesSerializer; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicLong; + +// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation +public class BackPressureStatus { +public static final short IDENTIFIER = (short)-600; +private static final int SIZE_OF_ID = 2; // size if IDENTIFIER +private static final int SIZE_OF_INT = 4; + +private static AtomicLong bpCount = new AtomicLong(0); + +public String workerId; +public final long id; // monotonically increasing id --- End diff -- OK thanks for clarification. I thought it as some kinds of guarantee we should ensure. Maybe better to clear out that that's not a requirement and only for debugging purpose. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r168032846 --- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java --- @@ -22,19 +22,23 @@ import org.apache.storm.executor.TupleInfo; import org.apache.storm.spout.ISpout; import org.apache.storm.spout.ISpoutOutputCollector; +import org.apache.storm.tuple.AddressedTuple; import org.apache.storm.tuple.MessageId; import org.apache.storm.tuple.TupleImpl; import org.apache.storm.tuple.Values; -import org.apache.storm.utils.Utils; import org.apache.storm.utils.MutableLong; import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.Random; +// Methods are not thread safe. Each thread expected to have a separate instance, or else synchronize externally --- End diff -- nit: better to make it as javadoc so that it can be exposed to more ways. As @revans2 stated, I also think this is a good assumption for a spout, but even better to update the restriction if we have documented any. That's just a 2 cents, not blocker. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r168026607 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.messaging.netty; + +import org.apache.storm.serialization.KryoValuesDeserializer; +import org.apache.storm.serialization.KryoValuesSerializer; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicLong; + +// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation +public class BackPressureStatus { +public static final short IDENTIFIER = (short)-600; +private static final int SIZE_OF_ID = 2; // size if IDENTIFIER +private static final int SIZE_OF_INT = 4; + +private static AtomicLong bpCount = new AtomicLong(0); + +public String workerId; +public final long id; // monotonically increasing id --- End diff -- Its only for debugging purposes.. so that we can co-relate sent & recvd msgs. I have used it to measure latency involved in transmission of BackPressureStatus msgs. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167814459 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.messaging.netty; + +import org.apache.storm.serialization.KryoValuesDeserializer; +import org.apache.storm.serialization.KryoValuesSerializer; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicLong; + +// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation +public class BackPressureStatus implements java.io.Serializable { --- End diff -- Thanks for the detailed comment. Made the changes and tested them as well. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167807269 --- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java --- @@ -44,14 +48,15 @@ private final Boolean isEventLoggers; private final Boolean isDebug; private final RotatingMappending; +private TupleInfo globalTupleInfo = new TupleInfo(); // thread safety: assumes Collector.emit*() calls are externally synchronized (if needed). --- End diff -- To get the id of the current thread involves a call to Thread.currentThread() which is quite [expensive](!http://www.jutils.com/checks/performance.html)... so not good to use to determine whether or not to use fast path. I am introducing that check if topology.debug is enabled as a compromise. This mode could be used mode to do any checks in dev mode that are unnecessary or expensive to do repeatedly in production. I have opened: [STORM-2945](!https://issues.apache.org/jira/browse/STORM-2945) to nail down and document background emits support.. we can document both spout & bolt support semantics together in the same document. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167800249 --- Diff: storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchExecutor.java --- @@ -194,7 +194,12 @@ public void reportError(Throwable t) { public void emitDirect(int task, String stream, List values, Object id) { throw new UnsupportedOperationException("Trident does not support direct streams"); } - + +@Override +public void flush() { +//NOOP //TODO: Roshan: validate if this is OK --- End diff -- needs to flush. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167625247 --- Diff: storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java --- @@ -40,109 +40,117 @@ import org.apache.storm.nimbus.NimbusInfo; public interface IStormClusterState { -public List assignments(Runnable callback); +List assignments(Runnable callback); -public Assignment assignmentInfo(String stormId, Runnable callback); +Assignment assignmentInfo(String stormId, Runnable callback); -public VersionedData assignmentInfoWithVersion(String stormId, Runnable callback); +VersionedData assignmentInfoWithVersion(String stormId, Runnable callback); -public Integer assignmentVersion(String stormId, Runnable callback) throws Exception; +Integer assignmentVersion(String stormId, Runnable callback) throws Exception; -public List blobstoreInfo(String blobKey); +List blobstoreInfo(String blobKey); -public List nimbuses(); +List nimbuses(); -public void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary); +void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary); -public List activeStorms(); +List activeStorms(); /** * Get a storm base for a topology * @param stormId the id of the topology * @param callback something to call if the data changes (best effort) * @return the StormBase or null if it is not alive. */ -public StormBase stormBase(String stormId, Runnable callback); +StormBase stormBase(String stormId, Runnable callback); -public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port); +ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port); -public List getWorkerProfileRequests(String stormId, NodeInfo nodeInfo); +List getWorkerProfileRequests(String stormId, NodeInfo nodeInfo); -public List getTopologyProfileRequests(String stormId); +List getTopologyProfileRequests(String stormId); -public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest); +void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest); -public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest); +void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest); -public MapexecutorBeats(String stormId, Map executorNodePort); +Map
executorBeats(String stormId, Map executorNodePort); -public List supervisors(Runnable callback); +List supervisors(Runnable callback); -public SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist +SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist -public void setupHeatbeats(String stormId); +void setupHeatbeats(String stormId); -public void teardownHeartbeats(String stormId); +void teardownHeartbeats(String stormId); -public void teardownTopologyErrors(String stormId); +void teardownTopologyErrors(String stormId); -public List heartbeatStorms(); +List heartbeatStorms(); -public List errorTopologies(); +List errorTopologies(); -public List backpressureTopologies(); +/** @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon. */ --- End diff -- done. 3.0.0 ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167402753 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.messaging.netty; + +import org.apache.storm.serialization.KryoValuesDeserializer; +import org.apache.storm.serialization.KryoValuesSerializer; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicLong; + +// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation +public class BackPressureStatus implements java.io.Serializable { --- End diff -- ControlMessage, MessageBatch and SaslMessageToken are handled explicitly by MessageDecoder and MessageEncoder using the buffer and read methods. When ControlMessage, SaslMessageToken, or Message Batch writes themselves out to a buffer they do not use kryo to do it. https://github.com/apache/storm/blob/aaebc3b237916340156ac3b8dc956d6c62c34983/storm-client/src/jvm/org/apache/storm/messaging/netty/ControlMessage.java#L60-L64 https://github.com/apache/storm/blob/aaebc3b237916340156ac3b8dc956d6c62c34983/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslMessageToken.java#L86-L101 https://github.com/apache/storm/blob/aaebc3b237916340156ac3b8dc956d6c62c34983/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBatch.java#L80-L93 It is a hand coded protocol (for good or bad). The messages inside MessageBatch have already been serialized using kryo elsewhere in the pipeline. For the load messages we hid them as a tuple inside a MessageBatch. We did this so we could do a rolling upgrade with it, but it is an ugly hack. BackPressureStatus is serialized/deserialized using MessageEncoder/MessageDecoder, but it uses kryo internally to do it. So please remove Serializable from BackPressureStatus. Register it with kryo. Do not remove `BackPressureStatus.buffer()` nor `BackPressureStatus.read()`. I think the changes to KryoTupleSerializer to send a BackPressureState can be removed assuming no one is calling them directly. The simplest way to test this is to run a topology with multiple workers and `topology.fall.back.on.java.serialization=false`. This is a config that makes it so kryo does not fall back to java serialization when trying to use kryo. For normal tuples/end users you can set `topology.testing.always.try.serialize=true` and every tuple emitted will be serialized, even in local mode. This is a way to unit test that you have setup kryo appropriately, but this BackPressureStatus is a special message so we need to do it differently. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167388018 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.messaging.netty; + +import org.apache.storm.serialization.KryoValuesDeserializer; +import org.apache.storm.serialization.KryoValuesSerializer; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicLong; + +// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation +public class BackPressureStatus implements java.io.Serializable { --- End diff -- Hmm interesting. Wondering why ControlMessage, MessageBatch & SaslMessageToken dont follow the same pattern of registering ? Can you confirm that you are suggesting the following steps ? - Remove the inheritance from Serializable interface - Register BackPressureStatus.class with kryo - Remove the BackPressureStatus.buffer() and BackPressureStatus.read() methods ? ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167387844 --- Diff: pom.xml --- @@ -259,6 +259,7 @@ 1.11 4.3.3 0.2.4 +2.0.1 --- End diff -- I guess I misread things, please ignore this comment. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167387808 --- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java --- @@ -44,14 +48,15 @@ private final Boolean isEventLoggers; private final Boolean isDebug; private final RotatingMappending; +private TupleInfo globalTupleInfo = new TupleInfo(); // thread safety: assumes Collector.emit*() calls are externally synchronized (if needed). --- End diff -- Can we do a sanity check for the fast path + documentation? Check if the thread id is the same as the id of the main thread we expect emits to come from. If so we go with the fast path, if not we have a thread local or do some kind of locking + documentation about why you never want the spout to emit from a background thread. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167387609 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.messaging.netty; + +import org.apache.storm.serialization.KryoValuesDeserializer; +import org.apache.storm.serialization.KryoValuesSerializer; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicLong; + +// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation +public class BackPressureStatus implements java.io.Serializable { --- End diff -- https://github.com/apache/storm/blob/aaebc3b237916340156ac3b8dc956d6c62c34983/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java#L66-L74 ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167387629 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.messaging.netty; + +import org.apache.storm.serialization.KryoValuesDeserializer; +import org.apache.storm.serialization.KryoValuesSerializer; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicLong; + +// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation +public class BackPressureStatus implements java.io.Serializable { --- End diff -- You should be able to get away with just registering the class and not providing a serializer. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167385890 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.messaging.netty; + +import org.apache.storm.serialization.KryoValuesDeserializer; +import org.apache.storm.serialization.KryoValuesSerializer; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicLong; + +// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation +public class BackPressureStatus implements java.io.Serializable { --- End diff -- I was not aware of that. Do we have an example of what i should do instead ? ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167385590 --- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java --- @@ -44,14 +48,15 @@ private final Boolean isEventLoggers; private final Boolean isDebug; private final RotatingMappending; +private TupleInfo globalTupleInfo = new TupleInfo(); // thread safety: assumes Collector.emit*() calls are externally synchronized (if needed). --- End diff -- Accessing the Thread Local (TL) instance via ThreadLocal.get() typically involves a map lookup behind the scenes. **Related Note:** I reluctantly used TL for latching on to JCQueue.BatchInserter instance for the producers to JCQueue. Reluctant since I noticed perf hit when doing some targeted microbenchmarking. I used it anyway because it was a perf improvement over the ConcurrentHashMap employed in Disruptor, and eliminating TL needed a bigger change to the interface and producers. I think it is possible to achieve TL free JCQueue and gain some perf.. perhaps in a follow up jira. Although many decisions were measured, due to scope it was not feasible to measure each one. So, in the critical path, I have taken this general approach of : - avoid locks & synchronization ... and try using lock/wait-free approaches where synchronization is unavoidable. - avoid map lookups and object creation This was a case of avoiding synchronization, (TL) map lookups & object allocation. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167383283 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/Acker.java --- @@ -99,17 +100,19 @@ public void execute(Tuple input) { pending.put(id, curr); } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) { resetTimeout = true; -if (curr != null) { +if (curr == null) { --- End diff -- Unintentional. This appears to be due a mistake most likely when resolving merge conflicts when I rebased my code to master sometime in Dec 2017. Will revert. Thanks for catching. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167381302 --- Diff: storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java --- @@ -40,109 +40,117 @@ import org.apache.storm.nimbus.NimbusInfo; public interface IStormClusterState { -public List assignments(Runnable callback); +List assignments(Runnable callback); -public Assignment assignmentInfo(String stormId, Runnable callback); +Assignment assignmentInfo(String stormId, Runnable callback); -public VersionedData assignmentInfoWithVersion(String stormId, Runnable callback); +VersionedData assignmentInfoWithVersion(String stormId, Runnable callback); -public Integer assignmentVersion(String stormId, Runnable callback) throws Exception; +Integer assignmentVersion(String stormId, Runnable callback) throws Exception; -public List blobstoreInfo(String blobKey); +List blobstoreInfo(String blobKey); -public List nimbuses(); +List nimbuses(); -public void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary); +void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary); -public List activeStorms(); +List activeStorms(); /** * Get a storm base for a topology * @param stormId the id of the topology * @param callback something to call if the data changes (best effort) * @return the StormBase or null if it is not alive. */ -public StormBase stormBase(String stormId, Runnable callback); +StormBase stormBase(String stormId, Runnable callback); -public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port); +ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port); -public List getWorkerProfileRequests(String stormId, NodeInfo nodeInfo); +List getWorkerProfileRequests(String stormId, NodeInfo nodeInfo); -public List getTopologyProfileRequests(String stormId); +List getTopologyProfileRequests(String stormId); -public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest); +void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest); -public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest); +void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest); -public MapexecutorBeats(String stormId, Map executorNodePort); +Map
executorBeats(String stormId, Map executorNodePort); -public List supervisors(Runnable callback); +List supervisors(Runnable callback); -public SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist +SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist -public void setupHeatbeats(String stormId); +void setupHeatbeats(String stormId); -public void teardownHeartbeats(String stormId); +void teardownHeartbeats(String stormId); -public void teardownTopologyErrors(String stormId); +void teardownTopologyErrors(String stormId); -public List heartbeatStorms(); +List heartbeatStorms(); -public List errorTopologies(); +List errorTopologies(); -public List backpressureTopologies(); +/** @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon. */ --- End diff -- Filed STORM-2944. Can you please add a 3.0 as a version to jira so that this jira can be associated with it ? ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167379385 --- Diff: pom.xml --- @@ -259,6 +259,7 @@ 1.11 4.3.3 0.2.4 +2.0.1 --- End diff -- I dont see any references to jctools in any flux related pom. Can you point to the specific offending location ? ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167286165 --- Diff: pom.xml --- @@ -259,6 +259,7 @@ 1.11 4.3.3 0.2.4 +2.0.1 --- End diff -- Why does flux need jctools? Shouldn't it come with storm-client? ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167328981 --- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java --- @@ -44,14 +48,15 @@ private final Boolean isEventLoggers; private final Boolean isDebug; private final RotatingMappending; +private TupleInfo globalTupleInfo = new TupleInfo(); // thread safety: assumes Collector.emit*() calls are externally synchronized (if needed). --- End diff -- This feels like a good assumption for a spout, but I would like to understand the cost of making this thread safe (thread local instance etc), and at least document it if that cost is high, or preferably find a low cost solution to throw an exception if it does happen. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167286543 --- Diff: storm-client/src/jvm/org/apache/storm/Config.java --- @@ -890,30 +871,91 @@ public static final String TOPOLOGY_ISOLATED_MACHINES = "topology.isolate.machines"; /** - * Configure timeout milliseconds used for disruptor queue wait strategy. Can be used to tradeoff latency - * vs. CPU usage + * Selects the Bolt's Wait Strategy to use when there are no incoming msgs. Used to trade off latency vs CPU usage. + */ +@isString --- End diff -- Can we check if this is an instance of the proper parent interface? ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167325679 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.daemon.worker; + +import org.apache.storm.Config; +import org.apache.storm.messaging.TaskMessage; +import org.apache.storm.policy.IWaitStrategy; +import org.apache.storm.serialization.ITupleSerializer; +import org.apache.storm.tuple.AddressedTuple; +import org.apache.storm.utils.JCQueue; +import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.TransferDrainer; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.Utils.SmartThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +// Transfers messages destined to other workers +class WorkerTransfer implements JCQueue.Consumer { +static final Logger LOG = LoggerFactory.getLogger(WorkerTransfer.class); + +private final TransferDrainer drainer; +private WorkerState workerState; +private IWaitStrategy backPressureWaitStrategy; + +JCQueue transferQueue; // [remoteTaskId] -> JCQueue. Some entries maybe null (if no emits to those tasksIds from this worker) +AtomicBoolean[] remoteBackPressureStatus; // [[remoteTaskId] -> true/false : indicates if remote task is under BP. --- End diff -- Same here for package private. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167288058 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/Acker.java --- @@ -99,17 +100,19 @@ public void execute(Tuple input) { pending.put(id, curr); } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) { resetTimeout = true; -if (curr != null) { +if (curr == null) { --- End diff -- Why is this change being made? ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167286681 --- Diff: storm-client/src/jvm/org/apache/storm/Config.java --- @@ -890,30 +871,91 @@ public static final String TOPOLOGY_ISOLATED_MACHINES = "topology.isolate.machines"; /** - * Configure timeout milliseconds used for disruptor queue wait strategy. Can be used to tradeoff latency - * vs. CPU usage + * Selects the Bolt's Wait Strategy to use when there are no incoming msgs. Used to trade off latency vs CPU usage. + */ +@isString +public static final String TOPOLOGY_BOLT_WAIT_STRATEGY = "topology.bolt.wait.strategy"; + +/** + * Configures park time for WaitStrategyPark. If set to 0, returns immediately (i.e busy wait). */ -@isInteger @NotNull -public static final String TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS="topology.disruptor.wait.timeout.millis"; +@isPositiveNumber(includeZero = true) +public static final String TOPOLOGY_BOLT_WAIT_PARK_MICROSEC = "topology.bolt.wait.park.microsec"; /** - * The number of tuples to batch before sending to the next thread. This number is just an initial suggestion and - * the code may adjust it as your topology runs. + * Configures number of iterations to spend in level 1 of WaitStrategyProgressive, before progressing to level 2 */ +@NotNull @isInteger @isPositiveNumber +public static final String TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL1_COUNT = "topology.bolt.wait.progressive.level1.count"; + +/** + * Configures number of iterations to spend in level 2 of WaitStrategyProgressive, before progressing to level 3 + */ @NotNull -public static final String TOPOLOGY_DISRUPTOR_BATCH_SIZE="topology.disruptor.batch.size"; +@isInteger +@isPositiveNumber +public static final String TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL2_COUNT = "topology.bolt.wait.progressive.level2.count"; /** - * The maximum age in milliseconds a batch can be before being sent to the next thread. This number is just an - * initial suggestion and the code may adjust it as your topology runs. + * Configures sleep time for WaitStrategyProgressive. */ +@NotNull +@isPositiveNumber(includeZero = true) +public static final String TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS = "topology.bolt.wait.progressive.level3.sleep.millis"; + + +/** + * A class that implements a wait strategy for an upstream component (spout/bolt) trying to write to a downstream component + * whose recv queue is full + * + * 1. nextTuple emits no tuples + * 2. The spout has hit maxSpoutPending and can't emit any more tuples + */ +@isString +public static final String TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY="topology.backpressure.wait.strategy"; --- End diff -- Here too if this is a class name it would be good to verify that it is an instance of a given class early on. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167325584 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.daemon.worker; + +import org.apache.storm.Config; +import org.apache.storm.messaging.TaskMessage; +import org.apache.storm.policy.IWaitStrategy; +import org.apache.storm.serialization.ITupleSerializer; +import org.apache.storm.tuple.AddressedTuple; +import org.apache.storm.utils.JCQueue; +import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.TransferDrainer; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.Utils.SmartThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +// Transfers messages destined to other workers +class WorkerTransfer implements JCQueue.Consumer { +static final Logger LOG = LoggerFactory.getLogger(WorkerTransfer.class); + +private final TransferDrainer drainer; +private WorkerState workerState; +private IWaitStrategy backPressureWaitStrategy; + +JCQueue transferQueue; // [remoteTaskId] -> JCQueue. Some entries maybe null (if no emits to those tasksIds from this worker) --- End diff -- nit why does this need to be package private? Can we lock it down more? ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167282875 --- Diff: docs/Concepts.md --- @@ -113,3 +113,8 @@ Topologies execute across one or more worker processes. Each worker process is a **Resources:** * [Config.TOPOLOGY_WORKERS](javadocs/org/apache/storm/Config.html#TOPOLOGY_WORKERS): this config sets the number of workers to allocate for executing the topology + +### Performance Tuning + +Refer to [performance tuning guide](docs/CONTRIBUTING.md) --- End diff -- I don't think `CONTRIBUTING.md` is the performance tuning guide, also the path is relative, and since `Performance.md` is in the same directory as this we don't need the `docs/` in the link ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167287026 --- Diff: storm-client/src/jvm/org/apache/storm/StormTimer.java --- @@ -97,6 +97,8 @@ public void run() { // events. Time.sleep(1000); } +if(Thread.interrupted()) +this.active.set(false); --- End diff -- Nit can we wrap this in `'{'` and `'}'` and have a space after the `if` to match the style guide. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167330523 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.messaging.netty; + +import org.apache.storm.serialization.KryoValuesDeserializer; +import org.apache.storm.serialization.KryoValuesSerializer; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicLong; + +// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation +public class BackPressureStatus implements java.io.Serializable { --- End diff -- Why does this need to be serializable if we are explicitly using kryo for all of the serialization? Can we just register the class with kryo instead so this will also work when java serialization is disabled? ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167331543 --- Diff: storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchExecutor.java --- @@ -194,7 +194,12 @@ public void reportError(Throwable t) { public void emitDirect(int task, String stream, List values, Object id) { throw new UnsupportedOperationException("Trident does not support direct streams"); } - + +@Override +public void flush() { +//NOOP //TODO: Roshan: validate if this is OK --- End diff -- TODO needs to go away. Is this OK to not have a flush? ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167326878 --- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java --- @@ -313,107 +330,74 @@ public void metricsTick(Task taskData, TupleImpl tuple) { protected void setupMetrics() { for (final Integer interval : intervalToTaskToMetricToRegistry.keySet()) { StormTimer timerTask = workerData.getUserTimer(); -timerTask.scheduleRecurring(interval, interval, new Runnable() { -@Override -public void run() { -TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(interval), -(int) Constants.SYSTEM_TASK_ID, Constants.METRICS_TICK_STREAM_ID); -List metricsTickTuple = -Lists.newArrayList(new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple)); -receiveQueue.publish(metricsTickTuple); +timerTask.scheduleRecurring(interval, interval, +() -> { +TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(interval), Constants.SYSTEM_COMPONENT_ID, +(int) Constants.SYSTEM_TASK_ID, Constants.METRICS_TICK_STREAM_ID); +AddressedTuple metricsTickTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple); +try { +receiveQueue.publish(metricsTickTuple); +receiveQueue.flush(); // avoid buffering +} catch (InterruptedException e) { +LOG.warn("Thread interrupted when publishing metrics. Setting interrupt flag."); +Thread.currentThread().interrupt(); +return; +} } -}); -} -} - -public void sendUnanchored(Task task, String stream, List values, ExecutorTransfer transfer) { -Tuple tuple = task.getTuple(stream, values); -List tasks = task.getOutgoingTasks(stream, values); -for (Integer t : tasks) { -transfer.transfer(t, tuple); -} -} - -/** - * Send sampled data to the eventlogger if the global or component level debug flag is set (via nimbus api). - */ -public void sendToEventLogger(Executor executor, Task taskData, List values, - String componentId, Object messageId, Random random) { -MapcomponentDebug = executor.getStormComponentDebug().get(); -DebugOptions debugOptions = componentDebug.get(componentId); -if (debugOptions == null) { -debugOptions = componentDebug.get(executor.getStormId()); -} -double spct = ((debugOptions != null) && (debugOptions.is_enable())) ? debugOptions.get_samplingpct() : 0; -if (spct > 0 && (random.nextDouble() * 100) < spct) { -sendUnanchored(taskData, StormCommon.EVENTLOGGER_STREAM_ID, -new Values(componentId, messageId, System.currentTimeMillis(), values), -executor.getExecutorTransfer()); +); } } -public void reflectNewLoadMapping(LoadMapping loadMapping) { -for (LoadAwareCustomStreamGrouping g : groupers) { -g.refreshLoad(loadMapping); -} -} - -private void registerBackpressure() { -receiveQueue.registerBackpressureCallback(new DisruptorBackpressureCallback() { -@Override -public void highWaterMark() throws Exception { -LOG.debug("executor " + executorId + " is congested, set backpressure flag true"); - WorkerBackpressureThread.notifyBackpressureChecker(workerData.getBackpressureTrigger()); -} - -@Override -public void lowWaterMark() throws Exception { -LOG.debug("executor " + executorId + " is not-congested, set backpressure flag false"); - WorkerBackpressureThread.notifyBackpressureChecker(workerData.getBackpressureTrigger()); -} -}); - receiveQueue.setHighWaterMark(ObjectReader.getDouble(topoConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK))); - receiveQueue.setLowWaterMark(ObjectReader.getDouble(topoConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK))); - receiveQueue.setEnableBackpressure(ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false)); -} - protected void setupTicks(boolean
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167293987 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java --- @@ -456,137 +450,135 @@ public void refreshStormActive(Runnable callback) { } } -public void refreshThrottle() { -boolean backpressure = stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, this::refreshThrottle); -this.throttleOn.set(backpressure); -} - -private static double getQueueLoad(DisruptorQueue q) { -DisruptorQueue.QueueMetrics qMetrics = q.getMetrics(); +private static double getQueueLoad(JCQueue q) { +JCQueue.QueueMetrics qMetrics = q.getMetrics(); return ((double) qMetrics.population()) / qMetrics.capacity(); } public void refreshLoad(List execs) { -Set remoteTasks = Sets.difference(new HashSet<>(outboundTasks), new HashSet<>(taskIds)); +Set remoteTasks = Sets.difference(new HashSet<>(outboundTasks), new HashSet<>(localTaskIds)); Long now = System.currentTimeMillis(); MaplocalLoad = new HashMap<>(); -for (IRunningExecutor exec: execs) { +for (IRunningExecutor exec : execs) { double receiveLoad = getQueueLoad(exec.getReceiveQueue()); -double sendLoad = getQueueLoad(exec.getSendQueue()); -localLoad.put(exec.getExecutorId().get(0).intValue(), Math.max(receiveLoad, sendLoad)); +localLoad.put(exec.getExecutorId().get(0).intValue(), receiveLoad); } Map remoteLoad = new HashMap<>(); cachedNodeToPortSocket.get().values().stream().forEach(conn -> remoteLoad.putAll(conn.getLoad(remoteTasks))); loadMapping.setLocal(localLoad); loadMapping.setRemote(remoteLoad); -if (now > nextUpdate.get()) { +if (now > nextLoadUpdate.get()) { receiver.sendLoadMetrics(localLoad); -nextUpdate.set(now + LOAD_REFRESH_INTERVAL_MS); +nextLoadUpdate.set(now + LOAD_REFRESH_INTERVAL_MS); } } +// checks if the tasks which had back pressure are now free again. if so, sends an update to other workers +public void refreshBackPressureStatus() { +LOG.debug("Checking for change in Backpressure status on worker's tasks"); +boolean bpSituationChanged = bpTracker.refreshBpTaskList(); +if (bpSituationChanged) { +BackPressureStatus bpStatus = bpTracker.getCurrStatus(); +receiver.sendBackPressureStatus(bpStatus); +} +} + + /** * we will wait all connections to be ready and then activate the spout/bolt * when the worker bootup. */ public void activateWorkerWhenAllConnectionsReady() { int delaySecs = 0; int recurSecs = 1; -refreshActiveTimer.schedule(delaySecs, new Runnable() { -@Override public void run() { +refreshActiveTimer.schedule(delaySecs, +() -> { if (areAllConnectionsReady()) { LOG.info("All connections are ready for worker {}:{} with id {}", assignmentId, port, workerId); isWorkerActive.set(Boolean.TRUE); } else { refreshActiveTimer.schedule(recurSecs, () -> activateWorkerWhenAllConnectionsReady(), false, 0); } } -}); +); } public void registerCallbacks() { LOG.info("Registering IConnectionCallbacks for {}:{}", assignmentId, port); receiver.registerRecv(new DeserializingConnectionCallback(topologyConf, getWorkerTopologyContext(), -this::transferLocal)); +this::transferLocalBatch)); +// Send curr BackPressure status to new clients +receiver.registerNewConnectionResponse( +() -> { +BackPressureStatus bpStatus = bpTracker.getCurrStatus(); +LOG.info("Sending BackPressure status to new client. BPStatus: {}", bpStatus); +return bpStatus; +} +); } -public void transferLocal(List tupleBatch) { -Map grouped = new HashMap<>(); -for (AddressedTuple tuple : tupleBatch) { -Integer executor = taskToShortExecutor.get(tuple.dest); -if (null == executor) { -LOG.warn("Received invalid messages for unknown tasks. Dropping... "); -
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167287393 --- Diff: storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java --- @@ -40,109 +40,117 @@ import org.apache.storm.nimbus.NimbusInfo; public interface IStormClusterState { -public List assignments(Runnable callback); +List assignments(Runnable callback); -public Assignment assignmentInfo(String stormId, Runnable callback); +Assignment assignmentInfo(String stormId, Runnable callback); -public VersionedData assignmentInfoWithVersion(String stormId, Runnable callback); +VersionedData assignmentInfoWithVersion(String stormId, Runnable callback); -public Integer assignmentVersion(String stormId, Runnable callback) throws Exception; +Integer assignmentVersion(String stormId, Runnable callback) throws Exception; -public List blobstoreInfo(String blobKey); +List blobstoreInfo(String blobKey); -public List nimbuses(); +List nimbuses(); -public void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary); +void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary); -public List activeStorms(); +List activeStorms(); /** * Get a storm base for a topology * @param stormId the id of the topology * @param callback something to call if the data changes (best effort) * @return the StormBase or null if it is not alive. */ -public StormBase stormBase(String stormId, Runnable callback); +StormBase stormBase(String stormId, Runnable callback); -public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port); +ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port); -public List getWorkerProfileRequests(String stormId, NodeInfo nodeInfo); +List getWorkerProfileRequests(String stormId, NodeInfo nodeInfo); -public List getTopologyProfileRequests(String stormId); +List getTopologyProfileRequests(String stormId); -public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest); +void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest); -public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest); +void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest); -public MapexecutorBeats(String stormId, Map executorNodePort); +Map
executorBeats(String stormId, Map executorNodePort); -public List supervisors(Runnable callback); +List supervisors(Runnable callback); -public SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist +SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist -public void setupHeatbeats(String stormId); +void setupHeatbeats(String stormId); -public void teardownHeartbeats(String stormId); +void teardownHeartbeats(String stormId); -public void teardownTopologyErrors(String stormId); +void teardownTopologyErrors(String stormId); -public List heartbeatStorms(); +List heartbeatStorms(); -public List errorTopologies(); +List errorTopologies(); -public List backpressureTopologies(); +/** @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon. */ --- End diff -- Could you file a JIRA for us to remove it in 3.x? ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167288382 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java --- @@ -362,6 +362,8 @@ public static void addSystemStreams(StormTopology topology) { public static void addEventLogger(Mapconf, StormTopology topology) { Integer numExecutors = ObjectReader.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS), ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS))); +if(numExecutors==null || numExecutors==0) --- End diff -- nit can we fix the style here with a space after the if and '{' '}' ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167285848 --- Diff: flux/flux-core/src/test/java/org/apache/storm/flux/multilang/MultilangEnvironmentTest.java --- @@ -18,6 +18,7 @@ package org.apache.storm.flux.multilang; +import org.junit.Ignore; --- End diff -- nit: I don't think this is used in here, so can we remove it? ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167097905 --- Diff: storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java --- @@ -17,72 +17,124 @@ */ package org.apache.storm.executor; -import com.google.common.annotations.VisibleForTesting; -import com.lmax.disruptor.EventHandler; import org.apache.storm.Config; import org.apache.storm.daemon.worker.WorkerState; import org.apache.storm.serialization.KryoTupleSerializer; import org.apache.storm.tuple.AddressedTuple; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.utils.DisruptorQueue; -import org.apache.storm.utils.MutableObject; +import org.apache.storm.utils.JCQueue; import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.Map; -import java.util.concurrent.Callable; +import java.util.Queue; -public class ExecutorTransfer implements EventHandler, Callable { +// Every executor has an instance of this class +public class ExecutorTransfer { private static final Logger LOG = LoggerFactory.getLogger(ExecutorTransfer.class); private final WorkerState workerData; -private final DisruptorQueue batchTransferQueue; -private final MaptopoConf; private final KryoTupleSerializer serializer; -private final MutableObject cachedEmit; private final boolean isDebug; +private final int producerBatchSz; +private int remotesBatchSz = 0; +private int indexingBase = 0; +private ArrayList localReceiveQueues; // [taskId-indexingBase] => queue : List of all recvQs local to this worker +private ArrayList queuesToFlush; // [taskId-indexingBase] => queue, some entries can be null. : outbound Qs for this executor instance -public ExecutorTransfer(WorkerState workerData, DisruptorQueue batchTransferQueue, Map topoConf) { + +public ExecutorTransfer(WorkerState workerData, Map topoConf) { this.workerData = workerData; -this.batchTransferQueue = batchTransferQueue; -this.topoConf = topoConf; this.serializer = new KryoTupleSerializer(topoConf, workerData.getWorkerTopologyContext()); -this.cachedEmit = new MutableObject(new ArrayList<>()); this.isDebug = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_DEBUG), false); +this.producerBatchSz = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE)); +} + +// to be called after all Executor objects in the worker are created and before this object is used +public void initLocalRecvQueues() { +Integer minTaskId = workerData.getLocalReceiveQueues().keySet().stream().min(Integer::compareTo).get(); +this.localReceiveQueues = Utils.convertToArray( workerData.getLocalReceiveQueues(), minTaskId); +this.indexingBase = minTaskId; +this.queuesToFlush = new ArrayList(Collections.nCopies(localReceiveQueues.size(), null) ); } -public void transfer(int task, Tuple tuple) { -AddressedTuple val = new AddressedTuple(task, tuple); +// adds addressedTuple to destination Q if it is not full. else adds to pendingEmits (if its not null) +public boolean tryTransfer(AddressedTuple addressedTuple, Queue pendingEmits) { if (isDebug) { -LOG.info("TRANSFERRING tuple {}", val); +LOG.info("TRANSFERRING tuple {}", addressedTuple); +} + +JCQueue localQueue = getLocalQueue(addressedTuple); +if (localQueue!=null) { +return tryTransferLocal(addressedTuple, localQueue, pendingEmits); +} else { +if (remotesBatchSz >= producerBatchSz) { +if ( !workerData.tryFlushRemotes() ) { +if (pendingEmits != null) { +pendingEmits.add(addressedTuple); +} +return false; +} +remotesBatchSz = 0; --- End diff -- @roshannaik and me briefly talked about this and agree this should be the thing to fix. Clear way but higher impact is guarding whole else statement with `synchronized` or more efficient locking if any, but @roshannaik would wanted to explore the way to minimize the impact of synchronization. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r164971328 --- Diff: storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java --- @@ -89,63 +107,80 @@ public void emitDirect(int taskId, String streamId, Collection anchors, L } } } +msgId = MessageId.makeId(anchorsToIds); +} else { +msgId = MessageId.makeUnanchored(); } -MessageId msgId = MessageId.makeId(anchorsToIds); -TupleImpl tupleExt = new TupleImpl(executor.getWorkerTopologyContext(), values, taskId, streamId, msgId); -executor.getExecutorTransfer().transfer(t, tupleExt); +TupleImpl tupleExt = new TupleImpl(executor.getWorkerTopologyContext(), values, executor.getComponentId(), taskId, streamId, msgId); +xsfer.tryTransfer(new AddressedTuple(t, tupleExt), executor.getPendingEmits()); } if (isEventLoggers) { -executor.sendToEventLogger(executor, taskData, values, executor.getComponentId(), null, random); +task.sendToEventLogger(executor, values, executor.getComponentId(), null, random, executor.getPendingEmits()); } return outTasks; } @Override public void ack(Tuple input) { +if(!ackingEnabled) +return; long ackValue = ((TupleImpl) input).getAckVal(); MapanchorsToIds = input.getMessageId().getAnchorsToIds(); for (Map.Entry entry : anchorsToIds.entrySet()) { -executor.sendUnanchored(taskData, Acker.ACKER_ACK_STREAM_ID, +task.sendUnanchored(Acker.ACKER_ACK_STREAM_ID, new Values(entry.getKey(), Utils.bitXor(entry.getValue(), ackValue)), -executor.getExecutorTransfer()); +executor.getExecutorTransfer(), executor.getPendingEmits()); } long delta = tupleTimeDelta((TupleImpl) input); if (isDebug) { LOG.info("BOLT ack TASK: {} TIME: {} TUPLE: {}", taskId, delta, input); } -BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta); -boltAckInfo.applyOn(taskData.getUserContext()); + +if ( !task.getUserContext().getHooks().isEmpty() ) { +BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta); +boltAckInfo.applyOn(task.getUserContext()); +} if (delta >= 0) { -((BoltExecutorStats) executor.getStats()).boltAckedTuple( -input.getSourceComponent(), input.getSourceStreamId(), delta); +executor.getStats().boltAckedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta); } } @Override public void fail(Tuple input) { +if(!ackingEnabled) +return; Set roots = input.getMessageId().getAnchors(); for (Long root : roots) { -executor.sendUnanchored(taskData, Acker.ACKER_FAIL_STREAM_ID, -new Values(root), executor.getExecutorTransfer()); +task.sendUnanchored(Acker.ACKER_FAIL_STREAM_ID, +new Values(root), executor.getExecutorTransfer(), executor.getPendingEmits()); } long delta = tupleTimeDelta((TupleImpl) input); if (isDebug) { LOG.info("BOLT fail TASK: {} TIME: {} TUPLE: {}", taskId, delta, input); } BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta); -boltFailInfo.applyOn(taskData.getUserContext()); -if (delta >= 0) { -((BoltExecutorStats) executor.getStats()).boltFailedTuple( -input.getSourceComponent(), input.getSourceStreamId(), delta); +boltFailInfo.applyOn(task.getUserContext()); +if (delta != 0) { --- End diff -- Looks like missed spot : this should be `delta >= 0`. https://github.com/apache/storm/pull/2241/files?diff=split#r158213916 ``` (when (<= 0 delta) (stats/bolt-failed-tuple! executor-stats (.getSourceComponent tuple) (.getSourceStreamId tuple) delta ``` ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r164956555 --- Diff: storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java --- @@ -17,72 +17,124 @@ */ package org.apache.storm.executor; -import com.google.common.annotations.VisibleForTesting; -import com.lmax.disruptor.EventHandler; import org.apache.storm.Config; import org.apache.storm.daemon.worker.WorkerState; import org.apache.storm.serialization.KryoTupleSerializer; import org.apache.storm.tuple.AddressedTuple; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.utils.DisruptorQueue; -import org.apache.storm.utils.MutableObject; +import org.apache.storm.utils.JCQueue; import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.Map; -import java.util.concurrent.Callable; +import java.util.Queue; -public class ExecutorTransfer implements EventHandler, Callable { +// Every executor has an instance of this class +public class ExecutorTransfer { private static final Logger LOG = LoggerFactory.getLogger(ExecutorTransfer.class); private final WorkerState workerData; -private final DisruptorQueue batchTransferQueue; -private final MaptopoConf; private final KryoTupleSerializer serializer; -private final MutableObject cachedEmit; private final boolean isDebug; +private final int producerBatchSz; +private int remotesBatchSz = 0; +private int indexingBase = 0; +private ArrayList localReceiveQueues; // [taskId-indexingBase] => queue : List of all recvQs local to this worker +private ArrayList queuesToFlush; // [taskId-indexingBase] => queue, some entries can be null. : outbound Qs for this executor instance -public ExecutorTransfer(WorkerState workerData, DisruptorQueue batchTransferQueue, Map topoConf) { + +public ExecutorTransfer(WorkerState workerData, Map topoConf) { this.workerData = workerData; -this.batchTransferQueue = batchTransferQueue; -this.topoConf = topoConf; this.serializer = new KryoTupleSerializer(topoConf, workerData.getWorkerTopologyContext()); -this.cachedEmit = new MutableObject(new ArrayList<>()); this.isDebug = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_DEBUG), false); +this.producerBatchSz = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE)); +} + +// to be called after all Executor objects in the worker are created and before this object is used +public void initLocalRecvQueues() { +Integer minTaskId = workerData.getLocalReceiveQueues().keySet().stream().min(Integer::compareTo).get(); +this.localReceiveQueues = Utils.convertToArray( workerData.getLocalReceiveQueues(), minTaskId); +this.indexingBase = minTaskId; +this.queuesToFlush = new ArrayList(Collections.nCopies(localReceiveQueues.size(), null) ); } -public void transfer(int task, Tuple tuple) { -AddressedTuple val = new AddressedTuple(task, tuple); +// adds addressedTuple to destination Q if it is not full. else adds to pendingEmits (if its not null) +public boolean tryTransfer(AddressedTuple addressedTuple, Queue pendingEmits) { if (isDebug) { -LOG.info("TRANSFERRING tuple {}", val); +LOG.info("TRANSFERRING tuple {}", addressedTuple); +} + +JCQueue localQueue = getLocalQueue(addressedTuple); +if (localQueue!=null) { +return tryTransferLocal(addressedTuple, localQueue, pendingEmits); +} else { +if (remotesBatchSz >= producerBatchSz) { +if ( !workerData.tryFlushRemotes() ) { +if (pendingEmits != null) { +pendingEmits.add(addressedTuple); +} +return false; +} +remotesBatchSz = 0; --- End diff -- @revans2 There is one instance of the ExecutorTransfer object per Executor. So the only way to invoke this concurrently is as follows: - background threads are spun up by the spout/bolt executor and - the outputcollector.emit() was called without any external synchronization. is that the situation you were thinking ? ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r164955129 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.daemon.worker; + +import org.apache.storm.messaging.netty.BackPressureStatus; +import org.apache.storm.utils.JCQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.storm.Constants.SYSTEM_TASK_ID; + +public class BackPressureTracker { +static final Logger LOG = LoggerFactory.getLogger(BackPressureTracker.class); + +private final MapbpTasks = new ConcurrentHashMap<>(); // updates are more frequent than iteration +private final Set nonBpTasks = ConcurrentHashMap.newKeySet(); +private final String workerId; + +public BackPressureTracker(String workerId, List allLocalTasks) { +this.workerId = workerId; +this.nonBpTasks.addAll(allLocalTasks);// all tasks are considered to be not under BP initially +this.nonBpTasks.remove((int)SYSTEM_TASK_ID); // not tracking system task +} + +/* called by transferLocalBatch() on NettyWorker thread + * returns true if an update was recorded, false if taskId is already under BP + */ +public boolean recordBackpressure(Integer taskId, JCQueue recvQ) { +if (nonBpTasks.remove(taskId)) { +bpTasks.put(taskId, recvQ); +return true; +} +return false; +} + +// returns true if there was a change in the BP situation +public boolean refreshBpTaskList() { +boolean changed = false; +LOG.debug("Running Back Pressure status change check"); +for (Iterator > itr = bpTasks.entrySet().iterator(); itr.hasNext(); ) { +Entry entry = itr.next(); +if (entry.getValue().isEmptyOverflow()) { +// move task from bpTasks to noBpTasks +nonBpTasks.add(entry.getKey()); +itr.remove(); --- End diff -- Next update will simplify the logic by using a single map and fix this issue as well. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r163967849 --- Diff: storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java --- @@ -47,7 +47,6 @@ public static final String STORMS_ROOT = "storms"; public static final String SUPERVISORS_ROOT = "supervisors"; public static final String WORKERBEATS_ROOT = "workerbeats"; -public static final String BACKPRESSURE_ROOT = "backpressure"; --- End diff -- @revans2 can u you clarify ? 2.x will require topo jars to be rebuilt for it. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r161361250 --- Diff: storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java --- @@ -76,6 +76,32 @@ public static Integer getInt(Object o, Integer defaultValue) { throw new IllegalArgumentException("Don't know how to convert " + o + " to int"); } +public static Long getLong(Object o) { +return getLong(o, null); +} + +public static Long getLong(Object o, Long defaultValue) { +if (null == o) { +return defaultValue; +} + +if ( o instanceof Long || +o instanceof Integer || +o instanceof Short || +o instanceof Byte) { +return ((Number) o).longValue(); +} else if (o instanceof Double) { +final long l = (Long) o; --- End diff -- yes my bad. I think its ok to throw exception for numbers larger than Long. I dont see the bug in getInt ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r161361184 --- Diff: storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java --- @@ -24,50 +24,46 @@ import org.apache.storm.task.GeneralTopologyContext; public class TupleImpl implements Tuple { -private final List values; -private final int taskId; -private final String streamId; -private final GeneralTopologyContext context; -private final MessageId id; +private List values; +private int taskId; +private String streamId; +private GeneralTopologyContext context; +private MessageId id; +private final String srcComponent; private Long _processSampleStartTime; private Long _executeSampleStartTime; private long _outAckVal = 0; - + public TupleImpl(Tuple t) { this.values = t.getValues(); this.taskId = t.getSourceTask(); this.streamId = t.getSourceStreamId(); this.id = t.getMessageId(); this.context = t.getContext(); -if (t instanceof TupleImpl) { +this.srcComponent = t.getSourceComponent(); +try { TupleImpl ti = (TupleImpl) t; this._processSampleStartTime = ti._processSampleStartTime; this._executeSampleStartTime = ti._executeSampleStartTime; this._outAckVal = ti._outAckVal; +} catch (ClassCastException e) { +// ignore ... if t is not a TupleImpl type .. faster than checking and then casting } } -public TupleImpl(GeneralTopologyContext context, List values, int taskId, String streamId, MessageId id) { +public TupleImpl(GeneralTopologyContext context, List values, String srcComponent, int taskId, String streamId, MessageId id) { this.values = Collections.unmodifiableList(values); this.taskId = taskId; this.streamId = streamId; this.id = id; this.context = context; - -String componentId = context.getComponentId(taskId); -Fields schema = context.getComponentOutputFields(componentId, streamId); -if(values.size()!=schema.size()) { -throw new IllegalArgumentException( -"Tuple created with wrong number of fields. " + -"Expected " + schema.size() + " fields but got " + -values.size() + " fields"); -} --- End diff -- Ok, enabling it for local mode. Doing my best to eliminate map lookups in critical path. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r161359459 --- Diff: storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java --- @@ -20,6 +20,7 @@ import org.apache.storm.Config; import org.apache.storm.generated.ComponentCommon; import org.apache.storm.generated.StormTopology; +import org.apache.storm.messaging.netty.BackPressureStatus; --- End diff -- fixed. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r161359446 --- Diff: storm-client/src/jvm/org/apache/storm/executor/TupleInfo.java --- @@ -23,7 +23,7 @@ import java.io.Serializable; import java.util.List; -public class TupleInfo implements Serializable { +public final class TupleInfo implements Serializable { --- End diff -- I think i was wondering if marking some of the frequently allocated and used objects as final impacts perf. Not necessary, I will revert it. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r161358991 --- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java --- @@ -225,51 +228,62 @@ private static String getExecutorType(WorkerTopologyContext workerTopologyContex } } +public Queue getPendingEmits() { +return pendingEmits; +} + /** * separated from mkExecutor in order to replace executor transfer in executor data for testing */ public ExecutorShutdown execute() throws Exception { LOG.info("Loading executor tasks " + componentId + ":" + executorId); -registerBackpressure(); -Utils.SmartThread systemThreads = -Utils.asyncLoop(executorTransfer, executorTransfer.getName(), reportErrorDie); - String handlerName = componentId + "-executor" + executorId; -Utils.SmartThread handlers = +Utils.SmartThread handler = Utils.asyncLoop(this, false, reportErrorDie, Thread.NORM_PRIORITY, true, true, handlerName); setupTicks(StatsUtil.SPOUT.equals(type)); LOG.info("Finished loading executor " + componentId + ":" + executorId); -return new ExecutorShutdown(this, Lists.newArrayList(systemThreads, handlers), idToTask); +return new ExecutorShutdown(this, Lists.newArrayList(handler), idToTask); } public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws Exception; -@SuppressWarnings("unchecked") @Override -public void onEvent(Object event, long seq, boolean endOfBatch) throws Exception { -ArrayList addressedTuples = (ArrayList) event; -for (AddressedTuple addressedTuple : addressedTuples) { -TupleImpl tuple = (TupleImpl) addressedTuple.getTuple(); -int taskId = addressedTuple.getDest(); -if (isDebug) { -LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple); -} +public void accept(Object event) { +if (event == JCQueue.INTERRUPT) { +throw new RuntimeException(new InterruptedException("JCQ processing interrupted") ); +} +AddressedTuple addressedTuple = (AddressedTuple)event; +int taskId = addressedTuple.getDest(); + +TupleImpl tuple = (TupleImpl) addressedTuple.getTuple(); +if (isDebug) { +LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple); +} + +try { if (taskId != AddressedTuple.BROADCAST_DEST) { tupleActionFn(taskId, tuple); } else { for (Integer t : taskIds) { tupleActionFn(t, tuple); } } +} catch (Exception e) { +throw new RuntimeException(e); --- End diff -- [Utils.asyncLoop()](https://github.com/roshannaik/storm/blob/STORM-2306-2/storm-client/src/jvm/org/apache/storm/utils/Utils.java#L351) of the BoltExecutor or SpoutExecutor thread ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r161358833 --- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java --- @@ -225,51 +228,62 @@ private static String getExecutorType(WorkerTopologyContext workerTopologyContex } } +public Queue getPendingEmits() { +return pendingEmits; +} + /** * separated from mkExecutor in order to replace executor transfer in executor data for testing */ public ExecutorShutdown execute() throws Exception { LOG.info("Loading executor tasks " + componentId + ":" + executorId); -registerBackpressure(); -Utils.SmartThread systemThreads = -Utils.asyncLoop(executorTransfer, executorTransfer.getName(), reportErrorDie); - String handlerName = componentId + "-executor" + executorId; -Utils.SmartThread handlers = +Utils.SmartThread handler = Utils.asyncLoop(this, false, reportErrorDie, Thread.NORM_PRIORITY, true, true, handlerName); setupTicks(StatsUtil.SPOUT.equals(type)); LOG.info("Finished loading executor " + componentId + ":" + executorId); -return new ExecutorShutdown(this, Lists.newArrayList(systemThreads, handlers), idToTask); +return new ExecutorShutdown(this, Lists.newArrayList(handler), idToTask); } public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws Exception; -@SuppressWarnings("unchecked") @Override -public void onEvent(Object event, long seq, boolean endOfBatch) throws Exception { -ArrayList addressedTuples = (ArrayList) event; -for (AddressedTuple addressedTuple : addressedTuples) { -TupleImpl tuple = (TupleImpl) addressedTuple.getTuple(); -int taskId = addressedTuple.getDest(); -if (isDebug) { -LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple); -} +public void accept(Object event) { +if (event == JCQueue.INTERRUPT) { +throw new RuntimeException(new InterruptedException("JCQ processing interrupted") ); --- End diff -- That exception bubbles up to and gets caught by the Utils.asyncLoop in the thread that calls the recvQueue.consume() [i.e. a BoltExecutor or SpoutExecutor thread] ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r161358582 --- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java --- @@ -196,19 +197,21 @@ public static Executor mkExecutor(WorkerState workerState, List executorId executor.stats = new BoltExecutorStats(ConfigUtils.samplingRate(executor.getStormConf()),ObjectReader.getInt(executor.getStormConf().get(Config.NUM_STAT_BUCKETS))); } +int minId = Integer.MAX_VALUE; MapidToTask = new HashMap<>(); for (Integer taskId : taskIds) { +minId = Math.min(minId, taskId); try { Task task = new Task(executor, taskId); -executor.sendUnanchored( -task, StormCommon.SYSTEM_STREAM_ID, new Values("startup"), executor.getExecutorTransfer()); +task.sendUnanchored( StormCommon.SYSTEM_STREAM_ID, new Values("startup"), executor.getExecutorTransfer(), null); // TODO: Roshan: does this get delivered/handled anywhere ? --- End diff -- I don't care too much, will retain if it has some use. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r161358493 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java --- @@ -155,134 +150,159 @@ public void start() throws Exception { Subject.doAs(subject, new PrivilegedExceptionAction() { @Override public Object run() throws Exception { -workerState = -new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage, +return loadWorker(topologyConf, stateStorage, stormClusterState, initCreds, initialCredentials); +} +}); // Subject.doAs(...) + +} + +private Object loadWorker(MaptopologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState, Map initCreds, Credentials initialCredentials) +throws Exception { +workerState = +new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage, stormClusterState); -// Heartbeat here so that worker process dies if this fails -// it's important that worker heartbeat to supervisor ASAP so that supervisor knows -// that worker is running and moves on -doHeartBeat(); +// Heartbeat here so that worker process dies if this fails +// it's important that worker heartbeat to supervisor ASAP so that supervisor knows +// that worker is running and moves on +doHeartBeat(); -executorsAtom = new AtomicReference<>(null); +executorsAtom = new AtomicReference<>(null); -// launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout -// to the supervisor -workerState.heartbeatTimer -.scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> { -try { -doHeartBeat(); -} catch (IOException e) { -throw new RuntimeException(e); -} -}); +// launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout +// to the supervisor +workerState.heartbeatTimer +.scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> { +try { +doHeartBeat(); +} catch (IOException e) { +throw new RuntimeException(e); +} +}); -workerState.executorHeartbeatTimer -.scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), +workerState.executorHeartbeatTimer +.scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), Worker.this::doExecutorHeartbeats); -workerState.registerCallbacks(); +workerState.registerCallbacks(); -workerState.refreshConnections(null); +workerState.refreshConnections(null); -workerState.activateWorkerWhenAllConnectionsReady(); +workerState.activateWorkerWhenAllConnectionsReady(); -workerState.refreshStormActive(null); +workerState.refreshStormActive(null); -workerState.runWorkerStartHooks(); +workerState.runWorkerStartHooks(); -List newExecutors = new ArrayList(); -for (List e : workerState.getExecutors()) { -if (ConfigUtils.isLocalMode(topologyConf)) { -newExecutors.add( -LocalExecutor.mkExecutor(workerState, e, initCreds) -.execute()); -} else { -newExecutors.add( -Executor.mkExecutor(workerState, e, initCreds) -.execute()); -} -} -executorsAtom.set(newExecutors); +List execs = new ArrayList<>(); +for (List e : workerState.getExecutors()) { +if (ConfigUtils.isLocalMode(topologyConf)) { +Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds); +execs.add(
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r161358387 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java --- @@ -22,24 +22,25 @@ public class SpoutThrottlingMetrics extends BuiltinMetrics { private final CountMetric skippedMaxSpoutMs = new CountMetric(); -private final CountMetric skippedThrottleMs = new CountMetric(); private final CountMetric skippedInactiveMs = new CountMetric(); +private final CountMetric skippedBackPressureMs = new CountMetric(); public SpoutThrottlingMetrics() { metricMap.put("skipped-max-spout-ms", skippedMaxSpoutMs); -metricMap.put("skipped-throttle-ms", skippedThrottleMs); metricMap.put("skipped-inactive-ms", skippedInactiveMs); +metricMap.put("skipped-backpressure-ms", skippedBackPressureMs); --- End diff -- fixed ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r161357014 --- Diff: storm-client/src/jvm/org/apache/storm/StormTimer.java --- @@ -193,6 +210,24 @@ public void run() { }); } +/** + * Schedule a function to run recurrently + * @param delayMs the number of millis to delay before running the function + * @param recurMs the time between each invocation + * @param func the function to run + */ +public void scheduleRecurringMs(long delayMs, final long recurMs, final Runnable func) { +scheduleMs(delayMs, new Runnable() { --- End diff -- Doesn't seem feasible due to the 'this' reference inside the lambda. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r161356532 --- Diff: docs/Performance.md --- @@ -0,0 +1,132 @@ +--- --- End diff -- I am putting a link to this doc from Concepts.md. If you have better place in mind please let me know. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r160066094 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/Task.java --- @@ -177,6 +195,35 @@ public BuiltinMetrics getBuiltInMetrics() { return builtInMetrics; } + +// Non Blocking call. If cannot emit to destination immediately, such tuples will be added to `pendingEmits` argument +public void sendUnanchored(String stream, List values, ExecutorTransfer transfer, Queue pendingEmits) { +Tuple tuple = getTuple(stream, values); +List tasks = getOutgoingTasks(stream, values); +for (Integer t : tasks) { +AddressedTuple addressedTuple = new AddressedTuple(t, tuple); +transfer.tryTransfer(addressedTuple, pendingEmits); +} +} + +/** + * Send sampled data to the eventlogger if the global or component level debug flag is set (via nimbus api). + */ +public void sendToEventLogger(Executor executor, List values, --- End diff -- I reread the code and found sampling percentage can be changed. I was thinking about reducing random.nextDouble(), but in this case we may not be able to do that. Please ignore my previous comment. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r160065976 --- Diff: storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java --- @@ -24,50 +24,46 @@ import org.apache.storm.task.GeneralTopologyContext; public class TupleImpl implements Tuple { -private final List values; -private final int taskId; -private final String streamId; -private final GeneralTopologyContext context; -private final MessageId id; +private List values; +private int taskId; +private String streamId; +private GeneralTopologyContext context; +private MessageId id; +private final String srcComponent; private Long _processSampleStartTime; private Long _executeSampleStartTime; private long _outAckVal = 0; - + public TupleImpl(Tuple t) { this.values = t.getValues(); this.taskId = t.getSourceTask(); this.streamId = t.getSourceStreamId(); this.id = t.getMessageId(); this.context = t.getContext(); -if (t instanceof TupleImpl) { +this.srcComponent = t.getSourceComponent(); +try { TupleImpl ti = (TupleImpl) t; this._processSampleStartTime = ti._processSampleStartTime; this._executeSampleStartTime = ti._executeSampleStartTime; this._outAckVal = ti._outAckVal; +} catch (ClassCastException e) { +// ignore ... if t is not a TupleImpl type .. faster than checking and then casting } } -public TupleImpl(GeneralTopologyContext context, List values, int taskId, String streamId, MessageId id) { +public TupleImpl(GeneralTopologyContext context, List values, String srcComponent, int taskId, String streamId, MessageId id) { this.values = Collections.unmodifiableList(values); this.taskId = taskId; this.streamId = streamId; this.id = id; this.context = context; - -String componentId = context.getComponentId(taskId); -Fields schema = context.getComponentOutputFields(componentId, streamId); -if(values.size()!=schema.size()) { -throw new IllegalArgumentException( -"Tuple created with wrong number of fields. " + -"Expected " + schema.size() + " fields but got " + -values.size() + " fields"); -} --- End diff -- I think having a map associating pair of taskId, streamId to schema's size may help reducing the slowdown. I know we are going to avoid map lookup, but it looks like acceptable tradeoff to me if we still want to leave the sanity check. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r160011014 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/Task.java --- @@ -177,6 +195,35 @@ public BuiltinMetrics getBuiltInMetrics() { return builtInMetrics; } + +// Non Blocking call. If cannot emit to destination immediately, such tuples will be added to `pendingEmits` argument +public void sendUnanchored(String stream, List values, ExecutorTransfer transfer, Queue pendingEmits) { +Tuple tuple = getTuple(stream, values); +List tasks = getOutgoingTasks(stream, values); +for (Integer t : tasks) { +AddressedTuple addressedTuple = new AddressedTuple(t, tuple); +transfer.tryTransfer(addressedTuple, pendingEmits); +} +} + +/** + * Send sampled data to the eventlogger if the global or component level debug flag is set (via nimbus api). + */ +public void sendToEventLogger(Executor executor, List values, --- End diff -- @HeartSaVioR you pointed out some optimizations are possible to this .. that we can tackle in another jira ... can you elaborate or capture your thoughts into a jira ? ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r160010894 --- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java --- @@ -105,19 +134,16 @@ public void reportError(Throwable error) { msgId = MessageId.makeUnanchored(); } -TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), values, this.taskId, stream, msgId); -executor.getExecutorTransfer().transfer(t, tuple); +final TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), values, executor.getComponentId(), this.taskId, stream, msgId); +AddressedTuple adrTuple = new AddressedTuple(t, tuple); +executor.getExecutorTransfer().tryTransfer(adrTuple, executor.getPendingEmits()); } if (isEventLoggers) { -executor.sendToEventLogger(executor, taskData, values, executor.getComponentId(), messageId, random); +taskData.sendToEventLogger(executor, values, executor.getComponentId(), messageId, random, executor.getPendingEmits()); } --- End diff -- @HeartSaVioR you pointed out some optimizations are possible to this .. that we can tackle in another jira ... can you elaborate or capture your thoughts into a jira ? ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r159969475 --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java --- @@ -328,20 +328,22 @@ public static boolean isSystemId(String id) { * @return the newly created thread * @see Thread */ -public static SmartThread asyncLoop(final Callable afn, -boolean isDaemon, final Thread.UncaughtExceptionHandler eh, -int priority, final boolean isFactory, boolean startImmediately, -String threadName) { +public static SmartThread asyncLoop(final Callable afn, boolean isDaemon, final Thread.UncaughtExceptionHandler eh, +int priority, final boolean isFactory, boolean startImmediately, +String threadName) { SmartThread thread = new SmartThread(new Runnable() { public void run() { -Object s; try { -Callable fn = isFactory ? (Callable) afn.call() : afn; -while ((s = fn.call()) instanceof Long) { -Time.sleepSecs((Long) s); +final Callable fn = isFactory ? (Callable) afn.call() : afn; +while (true) { +final Long s = fn.call(); +if (s==null) // then stop running it +break; +if (s>0) +Thread.sleep(s); --- End diff -- This needs to be Time.sleep if we want simulated time to work properly ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r159962361 --- Diff: storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java --- @@ -17,72 +17,124 @@ */ package org.apache.storm.executor; -import com.google.common.annotations.VisibleForTesting; -import com.lmax.disruptor.EventHandler; import org.apache.storm.Config; import org.apache.storm.daemon.worker.WorkerState; import org.apache.storm.serialization.KryoTupleSerializer; import org.apache.storm.tuple.AddressedTuple; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.utils.DisruptorQueue; -import org.apache.storm.utils.MutableObject; +import org.apache.storm.utils.JCQueue; import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.Map; -import java.util.concurrent.Callable; +import java.util.Queue; -public class ExecutorTransfer implements EventHandler, Callable { +// Every executor has an instance of this class +public class ExecutorTransfer { private static final Logger LOG = LoggerFactory.getLogger(ExecutorTransfer.class); private final WorkerState workerData; -private final DisruptorQueue batchTransferQueue; -private final MaptopoConf; private final KryoTupleSerializer serializer; -private final MutableObject cachedEmit; private final boolean isDebug; +private final int producerBatchSz; +private int remotesBatchSz = 0; +private int indexingBase = 0; +private ArrayList localReceiveQueues; // [taskId-indexingBase] => queue : List of all recvQs local to this worker +private ArrayList queuesToFlush; // [taskId-indexingBase] => queue, some entries can be null. : outbound Qs for this executor instance -public ExecutorTransfer(WorkerState workerData, DisruptorQueue batchTransferQueue, Map topoConf) { + +public ExecutorTransfer(WorkerState workerData, Map topoConf) { this.workerData = workerData; -this.batchTransferQueue = batchTransferQueue; -this.topoConf = topoConf; this.serializer = new KryoTupleSerializer(topoConf, workerData.getWorkerTopologyContext()); -this.cachedEmit = new MutableObject(new ArrayList<>()); this.isDebug = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_DEBUG), false); +this.producerBatchSz = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE)); +} + +// to be called after all Executor objects in the worker are created and before this object is used +public void initLocalRecvQueues() { +Integer minTaskId = workerData.getLocalReceiveQueues().keySet().stream().min(Integer::compareTo).get(); +this.localReceiveQueues = Utils.convertToArray( workerData.getLocalReceiveQueues(), minTaskId); +this.indexingBase = minTaskId; +this.queuesToFlush = new ArrayList(Collections.nCopies(localReceiveQueues.size(), null) ); } -public void transfer(int task, Tuple tuple) { -AddressedTuple val = new AddressedTuple(task, tuple); +// adds addressedTuple to destination Q if it is not full. else adds to pendingEmits (if its not null) +public boolean tryTransfer(AddressedTuple addressedTuple, Queue pendingEmits) { if (isDebug) { -LOG.info("TRANSFERRING tuple {}", val); +LOG.info("TRANSFERRING tuple {}", addressedTuple); +} + +JCQueue localQueue = getLocalQueue(addressedTuple); +if (localQueue!=null) { +return tryTransferLocal(addressedTuple, localQueue, pendingEmits); +} else { +if (remotesBatchSz >= producerBatchSz) { +if ( !workerData.tryFlushRemotes() ) { +if (pendingEmits != null) { +pendingEmits.add(addressedTuple); +} +return false; +} +remotesBatchSz = 0; --- End diff -- Do we have a race condition here? I believe that this method can be called from multiple different threads, and if so then we now have to worry about remotesBatchSz staying consistent. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r159969170 --- Diff: storm-client/src/jvm/org/apache/storm/utils/JCQueue.java --- @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.utils; + +import org.apache.storm.policy.IWaitStrategy; +import org.apache.storm.metric.api.IStatefulObject; +import org.apache.storm.metric.internal.RateTracker; +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.MpscArrayQueue; +import org.jctools.queues.MpscUnboundedArrayQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicLong; + + +public final class JCQueue implements IStatefulObject { +private static final Logger LOG = LoggerFactory.getLogger(JCQueue.class); + +public static final Object INTERRUPT = new Object(); + +private final ThroughputMeter emptyMeter = new ThroughputMeter("EmptyBatch"); --- End diff -- This is never reported anywhere... Do we want to just delete it and move ThroughputMeter to perf? ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r159956952 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.daemon.worker; + +import org.apache.storm.messaging.netty.BackPressureStatus; +import org.apache.storm.utils.JCQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.storm.Constants.SYSTEM_TASK_ID; + +public class BackPressureTracker { +static final Logger LOG = LoggerFactory.getLogger(BackPressureTracker.class); + +private final MapbpTasks = new ConcurrentHashMap<>(); // updates are more frequent than iteration +private final Set nonBpTasks = ConcurrentHashMap.newKeySet(); +private final String workerId; + +public BackPressureTracker(String workerId, List allLocalTasks) { +this.workerId = workerId; +this.nonBpTasks.addAll(allLocalTasks);// all tasks are considered to be not under BP initially +this.nonBpTasks.remove((int)SYSTEM_TASK_ID); // not tracking system task +} + +/* called by transferLocalBatch() on NettyWorker thread + * returns true if an update was recorded, false if taskId is already under BP + */ +public boolean recordBackpressure(Integer taskId, JCQueue recvQ) { +if (nonBpTasks.remove(taskId)) { +bpTasks.put(taskId, recvQ); +return true; +} +return false; +} + +// returns true if there was a change in the BP situation +public boolean refreshBpTaskList() { +boolean changed = false; +LOG.debug("Running Back Pressure status change check"); +for (Iterator > itr = bpTasks.entrySet().iterator(); itr.hasNext(); ) { +Entry entry = itr.next(); +if (entry.getValue().isEmptyOverflow()) { +// move task from bpTasks to noBpTasks +nonBpTasks.add(entry.getKey()); +itr.remove(); --- End diff -- It looks like there is a race condition here with `recordBackpressure`. If `nonBpTasks.add(entry.getKey())` finishes and then we get a context switch and `recordBackpressure` completes fully for the same taskId then itr.remove happens we might have an inconsistent state. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r159968476 --- Diff: storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java --- @@ -76,6 +76,32 @@ public static Integer getInt(Object o, Integer defaultValue) { throw new IllegalArgumentException("Don't know how to convert " + o + " to int"); } +public static Long getLong(Object o) { +return getLong(o, null); +} + +public static Long getLong(Object o, Long defaultValue) { +if (null == o) { +return defaultValue; +} + +if ( o instanceof Long || +o instanceof Integer || +o instanceof Short || +o instanceof Byte) { +return ((Number) o).longValue(); +} else if (o instanceof Double) { +final long l = (Long) o; --- End diff -- This is not going to work here as a Long can never be larger then MAX_VALUE for a long or smaller then MIN_VALUE for a Long. Technically I think there may be a bug in getInt if the number is larger then a Long can hold. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r159969789 --- Diff: storm-server/src/main/java/org/apache/storm/Testing.java --- @@ -712,6 +712,6 @@ public static Tuple testTuple(List values, MkTupleParam param) { new HashMap<>(), new HashMap<>(), new AtomicBoolean(false)); -return new TupleImpl(context, values, 1, stream); +return new TupleImpl(context, values, "component", 1, stream); --- End diff -- This I think is the cause of some of the test failures ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r159954152 --- Diff: storm-client/src/jvm/org/apache/storm/StormTimer.java --- @@ -193,6 +210,24 @@ public void run() { }); } +/** + * Schedule a function to run recurrently + * @param delayMs the number of millis to delay before running the function + * @param recurMs the time between each invocation + * @param func the function to run + */ +public void scheduleRecurringMs(long delayMs, final long recurMs, final Runnable func) { +scheduleMs(delayMs, new Runnable() { --- End diff -- nit could we use a java 8 lambda here instead? ``` scheduleMs(delayMs, () -> { func.run(); // This avoids a race condition with cancel-timer. scheduleMs(recurMs, this, true, 0); }); ``` ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r159957697 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java --- @@ -155,134 +150,159 @@ public void start() throws Exception { Subject.doAs(subject, new PrivilegedExceptionAction() { @Override public Object run() throws Exception { -workerState = -new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage, +return loadWorker(topologyConf, stateStorage, stormClusterState, initCreds, initialCredentials); +} +}); // Subject.doAs(...) + +} + +private Object loadWorker(MaptopologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState, Map initCreds, Credentials initialCredentials) +throws Exception { +workerState = +new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage, stormClusterState); -// Heartbeat here so that worker process dies if this fails -// it's important that worker heartbeat to supervisor ASAP so that supervisor knows -// that worker is running and moves on -doHeartBeat(); +// Heartbeat here so that worker process dies if this fails +// it's important that worker heartbeat to supervisor ASAP so that supervisor knows +// that worker is running and moves on +doHeartBeat(); -executorsAtom = new AtomicReference<>(null); +executorsAtom = new AtomicReference<>(null); -// launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout -// to the supervisor -workerState.heartbeatTimer -.scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> { -try { -doHeartBeat(); -} catch (IOException e) { -throw new RuntimeException(e); -} -}); +// launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout +// to the supervisor +workerState.heartbeatTimer +.scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> { +try { +doHeartBeat(); +} catch (IOException e) { +throw new RuntimeException(e); +} +}); -workerState.executorHeartbeatTimer -.scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), +workerState.executorHeartbeatTimer +.scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), Worker.this::doExecutorHeartbeats); -workerState.registerCallbacks(); +workerState.registerCallbacks(); -workerState.refreshConnections(null); +workerState.refreshConnections(null); -workerState.activateWorkerWhenAllConnectionsReady(); +workerState.activateWorkerWhenAllConnectionsReady(); -workerState.refreshStormActive(null); +workerState.refreshStormActive(null); -workerState.runWorkerStartHooks(); +workerState.runWorkerStartHooks(); -List newExecutors = new ArrayList(); -for (List e : workerState.getExecutors()) { -if (ConfigUtils.isLocalMode(topologyConf)) { -newExecutors.add( -LocalExecutor.mkExecutor(workerState, e, initCreds) -.execute()); -} else { -newExecutors.add( -Executor.mkExecutor(workerState, e, initCreds) -.execute()); -} -} -executorsAtom.set(newExecutors); +List execs = new ArrayList<>(); +for (List e : workerState.getExecutors()) { +if (ConfigUtils.isLocalMode(topologyConf)) { +Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds); +execs.add(
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r159969910 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java --- @@ -79,6 +79,7 @@ protected final long lowMemoryThresholdMB; protected final long mediumMemoryThresholdMb; protected final long mediumMemoryGracePeriodMs; +private static int port = 5006; // TODO: Roshan: remove this after stabilization --- End diff -- Yes please make sure this is removed, as leaving it in place is a security vulnerability. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r159954450 --- Diff: storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java --- @@ -47,7 +47,6 @@ public static final String STORMS_ROOT = "storms"; public static final String SUPERVISORS_ROOT = "supervisors"; public static final String WORKERBEATS_ROOT = "workerbeats"; -public static final String BACKPRESSURE_ROOT = "backpressure"; --- End diff -- In order to support running older topology versions under a newer 2.x nimbus/etc we should still keep around the basic setup and cleanup of the backpressure nodes in zookeeper at least until a 3.x release. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r159967636 --- Diff: storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java --- @@ -24,50 +24,46 @@ import org.apache.storm.task.GeneralTopologyContext; public class TupleImpl implements Tuple { -private final List values; -private final int taskId; -private final String streamId; -private final GeneralTopologyContext context; -private final MessageId id; +private List values; +private int taskId; +private String streamId; +private GeneralTopologyContext context; +private MessageId id; +private final String srcComponent; private Long _processSampleStartTime; private Long _executeSampleStartTime; private long _outAckVal = 0; - + public TupleImpl(Tuple t) { this.values = t.getValues(); this.taskId = t.getSourceTask(); this.streamId = t.getSourceStreamId(); this.id = t.getMessageId(); this.context = t.getContext(); -if (t instanceof TupleImpl) { +this.srcComponent = t.getSourceComponent(); +try { TupleImpl ti = (TupleImpl) t; this._processSampleStartTime = ti._processSampleStartTime; this._executeSampleStartTime = ti._executeSampleStartTime; this._outAckVal = ti._outAckVal; +} catch (ClassCastException e) { +// ignore ... if t is not a TupleImpl type .. faster than checking and then casting } } -public TupleImpl(GeneralTopologyContext context, List values, int taskId, String streamId, MessageId id) { +public TupleImpl(GeneralTopologyContext context, List values, String srcComponent, int taskId, String streamId, MessageId id) { this.values = Collections.unmodifiableList(values); this.taskId = taskId; this.streamId = streamId; this.id = id; this.context = context; - -String componentId = context.getComponentId(taskId); -Fields schema = context.getComponentOutputFields(componentId, streamId); -if(values.size()!=schema.size()) { -throw new IllegalArgumentException( -"Tuple created with wrong number of fields. " + -"Expected " + schema.size() + " fields but got " + -values.size() + " fields"); -} --- End diff -- I know this slows things down, but it is a good sanity check. Would it be possible to have a way to configure this on? Like in local mode? We would have to cache it though because just checking the conf might be as long as doing this check. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r159949952 --- Diff: docs/Performance.md --- @@ -0,0 +1,132 @@ +--- --- End diff -- Great Documentation, but can we have some of the other docs link to it? ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r159958378 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java --- @@ -155,134 +150,159 @@ public void start() throws Exception { Subject.doAs(subject, new PrivilegedExceptionAction() { @Override public Object run() throws Exception { -workerState = -new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage, +return loadWorker(topologyConf, stateStorage, stormClusterState, initCreds, initialCredentials); +} +}); // Subject.doAs(...) + +} + +private Object loadWorker(MaptopologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState, Map initCreds, Credentials initialCredentials) +throws Exception { +workerState = +new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage, stormClusterState); -// Heartbeat here so that worker process dies if this fails -// it's important that worker heartbeat to supervisor ASAP so that supervisor knows -// that worker is running and moves on -doHeartBeat(); +// Heartbeat here so that worker process dies if this fails +// it's important that worker heartbeat to supervisor ASAP so that supervisor knows +// that worker is running and moves on +doHeartBeat(); -executorsAtom = new AtomicReference<>(null); +executorsAtom = new AtomicReference<>(null); -// launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout -// to the supervisor -workerState.heartbeatTimer -.scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> { -try { -doHeartBeat(); -} catch (IOException e) { -throw new RuntimeException(e); -} -}); +// launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout +// to the supervisor +workerState.heartbeatTimer +.scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> { +try { +doHeartBeat(); +} catch (IOException e) { +throw new RuntimeException(e); +} +}); -workerState.executorHeartbeatTimer -.scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), +workerState.executorHeartbeatTimer +.scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), Worker.this::doExecutorHeartbeats); -workerState.registerCallbacks(); +workerState.registerCallbacks(); -workerState.refreshConnections(null); +workerState.refreshConnections(null); -workerState.activateWorkerWhenAllConnectionsReady(); +workerState.activateWorkerWhenAllConnectionsReady(); -workerState.refreshStormActive(null); +workerState.refreshStormActive(null); -workerState.runWorkerStartHooks(); +workerState.runWorkerStartHooks(); -List newExecutors = new ArrayList(); -for (List e : workerState.getExecutors()) { -if (ConfigUtils.isLocalMode(topologyConf)) { -newExecutors.add( -LocalExecutor.mkExecutor(workerState, e, initCreds) -.execute()); -} else { -newExecutors.add( -Executor.mkExecutor(workerState, e, initCreds) -.execute()); -} -} -executorsAtom.set(newExecutors); +List execs = new ArrayList<>(); +for (List e : workerState.getExecutors()) { +if (ConfigUtils.isLocalMode(topologyConf)) { +Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds); +execs.add(
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r159963018 --- Diff: storm-client/src/jvm/org/apache/storm/executor/TupleInfo.java --- @@ -23,7 +23,7 @@ import java.io.Serializable; import java.util.List; -public class TupleInfo implements Serializable { +public final class TupleInfo implements Serializable { --- End diff -- Why do we want this to be final? Just curious. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r159955584 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java --- @@ -22,24 +22,25 @@ public class SpoutThrottlingMetrics extends BuiltinMetrics { private final CountMetric skippedMaxSpoutMs = new CountMetric(); -private final CountMetric skippedThrottleMs = new CountMetric(); private final CountMetric skippedInactiveMs = new CountMetric(); +private final CountMetric skippedBackPressureMs = new CountMetric(); public SpoutThrottlingMetrics() { metricMap.put("skipped-max-spout-ms", skippedMaxSpoutMs); -metricMap.put("skipped-throttle-ms", skippedThrottleMs); metricMap.put("skipped-inactive-ms", skippedInactiveMs); +metricMap.put("skipped-backpressure-ms", skippedBackPressureMs); --- End diff -- ./docs/Metrics.md describes these. It should be updated to remove skipped-throttle-ms and replaced with skipped-backpressure-ms (and preferably mention that in older versions of storm skipped-throttle-ms would have been similar) ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r159960326 --- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java --- @@ -225,51 +228,62 @@ private static String getExecutorType(WorkerTopologyContext workerTopologyContex } } +public Queue getPendingEmits() { +return pendingEmits; +} + /** * separated from mkExecutor in order to replace executor transfer in executor data for testing */ public ExecutorShutdown execute() throws Exception { LOG.info("Loading executor tasks " + componentId + ":" + executorId); -registerBackpressure(); -Utils.SmartThread systemThreads = -Utils.asyncLoop(executorTransfer, executorTransfer.getName(), reportErrorDie); - String handlerName = componentId + "-executor" + executorId; -Utils.SmartThread handlers = +Utils.SmartThread handler = Utils.asyncLoop(this, false, reportErrorDie, Thread.NORM_PRIORITY, true, true, handlerName); setupTicks(StatsUtil.SPOUT.equals(type)); LOG.info("Finished loading executor " + componentId + ":" + executorId); -return new ExecutorShutdown(this, Lists.newArrayList(systemThreads, handlers), idToTask); +return new ExecutorShutdown(this, Lists.newArrayList(handler), idToTask); } public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws Exception; -@SuppressWarnings("unchecked") @Override -public void onEvent(Object event, long seq, boolean endOfBatch) throws Exception { -ArrayList addressedTuples = (ArrayList) event; -for (AddressedTuple addressedTuple : addressedTuples) { -TupleImpl tuple = (TupleImpl) addressedTuple.getTuple(); -int taskId = addressedTuple.getDest(); -if (isDebug) { -LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple); -} +public void accept(Object event) { +if (event == JCQueue.INTERRUPT) { +throw new RuntimeException(new InterruptedException("JCQ processing interrupted") ); --- End diff -- Could you explain a bit more when JCQueue.INTERRUPT happens and where this exception is caught/handled? Almost every other place that we use an InterruptedException it is treated as the worker is going down in an orderly manor so end the thread without complaining. I just want to be sure that we don't accidentally shut down part of the worker in some odd cases. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r159960500 --- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java --- @@ -225,51 +228,62 @@ private static String getExecutorType(WorkerTopologyContext workerTopologyContex } } +public Queue getPendingEmits() { +return pendingEmits; +} + /** * separated from mkExecutor in order to replace executor transfer in executor data for testing */ public ExecutorShutdown execute() throws Exception { LOG.info("Loading executor tasks " + componentId + ":" + executorId); -registerBackpressure(); -Utils.SmartThread systemThreads = -Utils.asyncLoop(executorTransfer, executorTransfer.getName(), reportErrorDie); - String handlerName = componentId + "-executor" + executorId; -Utils.SmartThread handlers = +Utils.SmartThread handler = Utils.asyncLoop(this, false, reportErrorDie, Thread.NORM_PRIORITY, true, true, handlerName); setupTicks(StatsUtil.SPOUT.equals(type)); LOG.info("Finished loading executor " + componentId + ":" + executorId); -return new ExecutorShutdown(this, Lists.newArrayList(systemThreads, handlers), idToTask); +return new ExecutorShutdown(this, Lists.newArrayList(handler), idToTask); } public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws Exception; -@SuppressWarnings("unchecked") @Override -public void onEvent(Object event, long seq, boolean endOfBatch) throws Exception { -ArrayList addressedTuples = (ArrayList) event; -for (AddressedTuple addressedTuple : addressedTuples) { -TupleImpl tuple = (TupleImpl) addressedTuple.getTuple(); -int taskId = addressedTuple.getDest(); -if (isDebug) { -LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple); -} +public void accept(Object event) { +if (event == JCQueue.INTERRUPT) { +throw new RuntimeException(new InterruptedException("JCQ processing interrupted") ); +} +AddressedTuple addressedTuple = (AddressedTuple)event; +int taskId = addressedTuple.getDest(); + +TupleImpl tuple = (TupleImpl) addressedTuple.getTuple(); +if (isDebug) { +LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple); +} + +try { if (taskId != AddressedTuple.BROADCAST_DEST) { tupleActionFn(taskId, tuple); } else { for (Integer t : taskIds) { tupleActionFn(t, tuple); } } +} catch (Exception e) { +throw new RuntimeException(e); --- End diff -- Where is this exception handled? ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r159966670 --- Diff: storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java --- @@ -20,6 +20,7 @@ import org.apache.storm.Config; import org.apache.storm.generated.ComponentCommon; import org.apache.storm.generated.StormTopology; +import org.apache.storm.messaging.netty.BackPressureStatus; --- End diff -- Is this change needed? Is this even used here? ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r159959648 --- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java --- @@ -196,19 +197,21 @@ public static Executor mkExecutor(WorkerState workerState, List executorId executor.stats = new BoltExecutorStats(ConfigUtils.samplingRate(executor.getStormConf()),ObjectReader.getInt(executor.getStormConf().get(Config.NUM_STAT_BUCKETS))); } +int minId = Integer.MAX_VALUE; MapidToTask = new HashMap<>(); for (Integer taskId : taskIds) { +minId = Math.min(minId, taskId); try { Task task = new Task(executor, taskId); -executor.sendUnanchored( -task, StormCommon.SYSTEM_STREAM_ID, new Values("startup"), executor.getExecutorTransfer()); +task.sendUnanchored( StormCommon.SYSTEM_STREAM_ID, new Values("startup"), executor.getExecutorTransfer(), null); // TODO: Roshan: does this get delivered/handled anywhere ? --- End diff -- Answer: storm-core/test/clj/integration/org/apache/storm/integration_test.clj It can sometimes be helpful with debugging when you have topology.debug enabled, but it is not that critical and can probably be removed if you want to. If not please remove the comment. ---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r159968772 --- Diff: storm-client/src/jvm/org/apache/storm/utils/RunningAvg.java --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.utils; + +public class RunningAvg { + +private long n = 0; +private double oldM, newM, oldS, newS; +private String name; +public static int printFreq = 20_000_000; +private boolean disable; +private long count = 0; + +public RunningAvg(String name, boolean disable) { +this(name, printFreq, disable); +} + +public RunningAvg(String name, int printFreq) { +this(name, printFreq, false); +} + +public RunningAvg(String name, int printFreq, boolean disable) { +this.name = name + "_" + Thread.currentThread().getName(); +this.printFreq = printFreq; +this.disable = disable; +} + +public void clear() { +n = 0; +} + +public void pushLatency(long startMs) { +push(System.currentTimeMillis() - startMs); +} + +public void push(long x) { +if (disable) { +return; +} + +n++; + +if (n == 1) { +oldM = newM = x; +oldS = 0; +} else { +newM = oldM + (x - oldM) / n; +newS = oldS + (x - oldM) * (x - newM); + +// set up for next iteration +oldM = newM; +oldS = newS; +} +if (++count == printFreq) { +System.err.printf(" ***> %s - %,.2f\n", name, mean()); --- End diff -- Can we log this instead of printing it? Also could you file a follow on JIRA so that when we go to a better metrics implementation that we use that instead of printing/logging? ---