[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167388018
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java 
---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Instances of this type are sent from NettyWorker to upstream 
WorkerTransfer to indicate BackPressure situation
+public class BackPressureStatus implements java.io.Serializable {
--- End diff --

Hmm interesting. 
Wondering why ControlMessage, MessageBatch & SaslMessageToken dont follow 
the same pattern of registering ?

Can you confirm that you are suggesting the following steps ?
- Remove the inheritance from Serializable interface
- Register BackPressureStatus.class with kryo
- Remove the BackPressureStatus.buffer() and BackPressureStatus.read() 
methods ?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167387844
  
--- Diff: pom.xml ---
@@ -259,6 +259,7 @@
 1.11
 4.3.3
 0.2.4
+2.0.1
--- End diff --

I guess I misread things, please ignore this comment.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167387808
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
 ---
@@ -44,14 +48,15 @@
 private final Boolean isEventLoggers;
 private final Boolean isDebug;
 private final RotatingMap pending;
+private TupleInfo globalTupleInfo = new TupleInfo();  // thread 
safety: assumes Collector.emit*() calls are externally synchronized (if needed).
--- End diff --

Can we do a sanity check for the fast path + documentation?  Check if the 
thread id is the same as the id of the main thread we expect emits to come 
from.  If so we go with the fast path, if not we have a thread local or do some 
kind of locking + documentation about why you never want the spout to emit from 
a background thread.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167387609
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java 
---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Instances of this type are sent from NettyWorker to upstream 
WorkerTransfer to indicate BackPressure situation
+public class BackPressureStatus implements java.io.Serializable {
--- End diff --


https://github.com/apache/storm/blob/aaebc3b237916340156ac3b8dc956d6c62c34983/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java#L66-L74


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167387629
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java 
---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Instances of this type are sent from NettyWorker to upstream 
WorkerTransfer to indicate BackPressure situation
+public class BackPressureStatus implements java.io.Serializable {
--- End diff --

You should be able to get away with just registering the class and not 
providing a serializer.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167385890
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java 
---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Instances of this type are sent from NettyWorker to upstream 
WorkerTransfer to indicate BackPressure situation
+public class BackPressureStatus implements java.io.Serializable {
--- End diff --

I was not aware of that. Do we have an example of what i should do instead ?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167385590
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
 ---
@@ -44,14 +48,15 @@
 private final Boolean isEventLoggers;
 private final Boolean isDebug;
 private final RotatingMap pending;
+private TupleInfo globalTupleInfo = new TupleInfo();  // thread 
safety: assumes Collector.emit*() calls are externally synchronized (if needed).
--- End diff --

Accessing the Thread Local (TL) instance via ThreadLocal.get() typically 
involves a map lookup behind the scenes.

**Related Note:** I reluctantly used TL for latching on to 
JCQueue.BatchInserter instance for the producers to JCQueue. Reluctant since I 
noticed perf hit when doing some targeted microbenchmarking. I used it anyway 
because it was a perf improvement over the ConcurrentHashMap employed in 
Disruptor, and eliminating TL needed a bigger change to the interface and 
producers. I think it is possible to achieve TL free JCQueue and gain some 
perf.. perhaps in a follow up jira.

Although many decisions were measured, due to scope it was not feasible to 
measure each one. So, in the critical path, I have taken this general approach 
of :
- avoid locks & synchronization ... and try using lock/wait-free approaches 
where synchronization is unavoidable.
- avoid map lookups and object creation

This was a case of avoiding synchronization, (TL) map lookups & object 
allocation.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167383283
  
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/Acker.java ---
@@ -99,17 +100,19 @@ public void execute(Tuple input) {
 pending.put(id, curr);
 } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
 resetTimeout = true;
-if (curr != null) {
+if (curr == null) {
--- End diff --

Unintentional. This appears to be due a mistake most likely when resolving 
merge conflicts when I rebased my code to master sometime in Dec 2017. Will 
revert. Thanks for catching.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167381302
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java ---
@@ -40,109 +40,117 @@
 import org.apache.storm.nimbus.NimbusInfo;
 
 public interface IStormClusterState {
-public List assignments(Runnable callback);
+List assignments(Runnable callback);
 
-public Assignment assignmentInfo(String stormId, Runnable callback);
+Assignment assignmentInfo(String stormId, Runnable callback);
 
-public VersionedData assignmentInfoWithVersion(String 
stormId, Runnable callback);
+VersionedData assignmentInfoWithVersion(String stormId, 
Runnable callback);
 
-public Integer assignmentVersion(String stormId, Runnable callback) 
throws Exception;
+Integer assignmentVersion(String stormId, Runnable callback) throws 
Exception;
 
-public List blobstoreInfo(String blobKey);
+List blobstoreInfo(String blobKey);
 
-public List nimbuses();
+List nimbuses();
 
-public void addNimbusHost(String nimbusId, NimbusSummary 
nimbusSummary);
+void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
 
-public List activeStorms();
+List activeStorms();
 
 /**
  * Get a storm base for a topology
  * @param stormId the id of the topology
  * @param callback something to call if the data changes (best effort)
  * @return the StormBase or null if it is not alive.
  */
-public StormBase stormBase(String stormId, Runnable callback);
+StormBase stormBase(String stormId, Runnable callback);
 
-public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, 
String node, Long port);
+ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, 
Long port);
 
-public List getWorkerProfileRequests(String stormId, 
NodeInfo nodeInfo);
+List getWorkerProfileRequests(String stormId, NodeInfo 
nodeInfo);
 
-public List getTopologyProfileRequests(String stormId);
+List getTopologyProfileRequests(String stormId);
 
-public void setWorkerProfileRequest(String stormId, ProfileRequest 
profileRequest);
+void setWorkerProfileRequest(String stormId, ProfileRequest 
profileRequest);
 
-public void deleteTopologyProfileRequests(String stormId, 
ProfileRequest profileRequest);
+void deleteTopologyProfileRequests(String stormId, ProfileRequest 
profileRequest);
 
-public Map executorBeats(String stormId, 
Map executorNodePort);
+Map executorBeats(String stormId, 
Map executorNodePort);
 
-public List supervisors(Runnable callback);
+List supervisors(Runnable callback);
 
-public SupervisorInfo supervisorInfo(String supervisorId); // returns 
nil if doesn't exist
+SupervisorInfo supervisorInfo(String supervisorId); // returns nil if 
doesn't exist
 
-public void setupHeatbeats(String stormId);
+void setupHeatbeats(String stormId);
 
-public void teardownHeartbeats(String stormId);
+void teardownHeartbeats(String stormId);
 
-public void teardownTopologyErrors(String stormId);
+void teardownTopologyErrors(String stormId);
 
-public List heartbeatStorms();
+List heartbeatStorms();
 
-public List errorTopologies();
+List errorTopologies();
 
-public List backpressureTopologies();
+/** @deprecated: In Storm 2.0. Retained for enabling transition from 
1.x. Will be removed soon. */
--- End diff --

Filed STORM-2944. Can you please add a 3.0 as a version to jira so that 
this jira can be associated with it ?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167379385
  
--- Diff: pom.xml ---
@@ -259,6 +259,7 @@
 1.11
 4.3.3
 0.2.4
+2.0.1
--- End diff --

I dont see any references to jctools in any flux related pom.  Can you 
point to the specific offending location ?


---


Re: [VOTE] Release Apache Storm 1.2.0 (rc4)

2018-02-09 Thread Arun Iyer
+1 (binding)

- Downloaded and built the source distribution
- Downloaded the binary distributions
- Validated MD5
- Complied starter topologies from binary distribution
- Ran ExclamationTopology and RollingTopWords
- Checked the metrics from Storm UI
- Check logs via log viewer
- Listed and Killed topologies from UI and command line

Thanks,
Arun


On 2/7/18, 8:16 PM, "Satish Duggana"  wrote:

>+1 (binding)
>
>src distribution
>  - Retrieved source archive and verified files
>  - Built the binary package from the above source archive
>
>bin distribution
>  - Ran different topologies in local cluster
>  - Created a 3 node cluster with worker slots.
>  - Deployed few topologies
>  - Checked various options (like deactivate/kill/activate topology view
>etc) and monitoring stats in the UI for those topologies.
>  - Ran storm commands on those topologies like
>deactivate/rebalance/activate/kill with respective options.
>  - Killed some of the workers to check failover etc.
>  - Checked change log level settings for topologies.
>
>Thanks,
>Satish.
>
>On Thu, Feb 8, 2018 at 6:39 AM, Jungtaek Lim  wrote:
>
>> +1 (binding)
>>
>> > source
>>
>> - verify file (signature, MD5, SHA)
>> -- source, tar.gz : OK
>> -- source, zip : OK
>>
>> - extract file
>> -- source, tar.gz : OK
>> -- source, zip : OK
>>
>> - diff-ing extracted files between tar.gz and zip : OK
>>
>> - build source with JDK 7
>> -- source, tar.gz : OK
>>
>> > binary
>>
>> - verify file (signature, MD5, SHA)
>> -- binary, tar.gz : OK
>> -- binary, zip : OK
>>
>> - extract file
>> -- binary, tar.gz : OK
>> -- binary, zip : OK
>>
>> - diff-ing extracted files between tar.gz and zip : OK
>>
>> I already verified others from RC3, and also verified that
>> storm-kafka-monitor source/javadoc files are no longer exist in toollib
>> directory.
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>> 2018년 2월 8일 (목) 오전 7:24, P. Taylor Goetz 님이 작성:
>>
>> > This is a call to vote on releasing Apache Storm 1.2.0 (rc4)
>> >
>> > Note that the only difference between rc4 and rc3 is the fix for
>> > https://issues.apache.org/jira/browse/STORM-2942.
>> >
>> > Full list of changes in this release:
>> >
>> >
>> > https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.
>> 2.0-rc4/RELEASE_NOTES.html
>> >
>> > The tag/commit to be voted upon is v1.2.0:
>> >
>> >
>> > https://git-wip-us.apache.org/repos/asf?p=storm.git;a=tree;h=
>> cef4d49e222e53656f38c40d754d4f41799cd9a9;hb=2a0097f9a20b9df494caadb87c87d4
>> e4db01a7ed
>> >
>> > The source archive being voted upon can be found here:
>> >
>> >
>> > https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.
>> 2.0-rc4/apache-storm-1.2.0-src.tar.gz
>> >
>> > Other release files, signatures and digests can be found here:
>> >
>> > https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.2.0-rc4/
>> >
>> > The release artifacts are signed with the following key:
>> >
>> >
>> > https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_
>> plain;f=KEYS;hb=22b832708295fa2c15c4f3c70ac0d2bc6fded4bd
>> >
>> > The Nexus staging repository for this release is:
>> >
>> > https://repository.apache.org/content/repositories/orgapachestorm-1058
>> >
>> > Please vote on releasing this package as Apache Storm 1.2.0.
>> >
>> > When voting, please list the actions taken to verify the release.
>> >
>> > This vote will be open for at least 72 hours.
>> >
>> > [ ] +1 Release this package as Apache Storm 1.2.0
>> > [ ]  0 No opinion
>> > [ ] -1 Do not release this package because...
>> >
>> > Thanks to everyone who contributed to this release.
>> >
>> > -Taylor
>> >
>>


[GitHub] storm pull request #2551: [STORM-2940] Dynamically set the CAPACITY value of...

2018-02-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2551


---


[GitHub] storm issue #2551: [STORM-2940] Dynamically set the CAPACITY value of LoadAw...

2018-02-09 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2551
  
+1 again.


---


[GitHub] storm pull request #2551: [STORM-2940] Dynamically set the CAPACITY value of...

2018-02-09 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2551#discussion_r167374065
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java ---
@@ -85,6 +85,7 @@ public void prepare(WorkerTopologyContext context, 
GlobalStreamId stream, List

Re: [VOTE] Release Apache Storm 1.0.6 (rc3)

2018-02-09 Thread P. Taylor Goetz
Doh! I didn’t even think consider that it was likely autogenerated by an IDE 
and likely identical to what the JDK would generate.

I agree with Bobby, let’s leave it there and continue with the RC.

Thanks for pointing that out Bobby.

-Taylor

> On Feb 9, 2018, at 4:07 PM, Bobby Evans  wrote:
> 
> Actually it is set properly.  Do NOT change it.  It works just fine the way
> it is.
> 
> $ git checkout f6f35dd98d2492a38aa4d61da7f6caee4ec2f31a # The git version
> right before this change on branch-1.x
> $ mvn clean install -DskipTests
> $ serialver -classpath ./storm-core/target/storm-core-1.1.0-SNAPSHOT.jar
> org.apache.storm.tuple.Fields
> org.apache.storm.tuple.Fields:private static final long
> serialVersionUID = -3377931843059975424L;
> 
> This is the same version that we set it to as a part of the change.
> 
> - Bobby
> 
> On Fri, Feb 9, 2018 at 3:00 PM Bobby Evans  wrote:
> 
>> Fields changing will cause problems if it is serialized as part of a
>> bolt/spout or as part of a custom grouping.  I have not checked explicitly,
>> but removing that line is the wrong thing to do.  By default the 
>> serialVersionUID
>> is generated from the class itself, so removing it, but leaving in the
>> modified code would still break backwards compatibility.
>> 
>> I'll take a look and see what you need to set it to so you don't break
>> backwards compatibility on 1.x
>> 
>> On Fri, Feb 9, 2018 at 10:19 AM P. Taylor Goetz  wrote:
>> 
>>> What are others’ opinions on removing the serialversionUid an moving
>>> ahead with an RC4?
>>> 
>>> -Taylor
>>> 
 On Feb 9, 2018, at 7:21 AM, Jungtaek Lim  wrote:
 
 I just went ahead verifying current RC except serialization UID issue in
 Fields. I could also vote for RC4 immediately if necessary.
 
 +1 (binding)
 
> source
 
 - verify file (signature, MD5, SHA)
 -- source, tar.gz : OK
 -- source, zip : OK
 
 - extract file
 -- source, tar.gz : OK
 -- source, zip : OK
 
 - diff-ing extracted files between tar.gz and zip : OK
 
 - build source with JDK 7
 -- source, tar.gz : OK
 
 - build source dist
 -- source, tar.gz : OK
 
 - build binary dist
 -- source, tar.gz : OK
 
> binary
 
 - verify file (signature, MD5, SHA)
 -- binary, tar.gz : OK
 -- binary, zip : OK
 
 - extract file
 -- binary, tar.gz : OK
 -- binary, zip : OK
 
 - diff-ing extracted files between tar.gz and zip : OK
 
 - launch daemons : OK
 
 - run RollingTopWords (local) : OK
 
 - run RollingTopWords (remote) : OK
 - activate / deactivate / rebalance / kill : OK
 - logviewer (worker dir, daemon dir) : OK
 - change log level : OK
 - thread dump, heap dump, restart worker : OK
 - log search : OK
 
 Thanks,
 Jungtaek Lim (HeartSaVioR)
 
 2018년 2월 9일 (금) 오후 6:18, Erik Weathers 님이
>>> 작성:
 
> I'm fine submitting a PR to back that line out (or any of you committer
> folks could just rip it out).
> 
> But I'd like to understand Storm a bit better as part of making this
> decision. :-)  Am I correct in assuming it would only be a problem if
>>> the
> serialized Fields were stored somewhere (e.g., ZooKeeper, local
>>> filesystem)
> and then read back in after the Nimbus/Workers are brought back up
>>> after
> the upgrade?  Seems Fields is used in a *lot* of places, and I don't
>>> know
> precisely what is serialized for reused upon Storm Nimbus/Worker daemon
> restarts.  I believe there are examples of Fields being used to create
> Spout or Bolt objects that are used to create the StormTopology object,
> which I believe is serialized into ZooKeeper.  But I'm not clear if
>>> it's
> directly the Fields object itself or some kind of translation from that
> into the thrift objects that make up StormTopology.
> 
> I also don't know exactly when kryo is applicable in Storm.  I've never
> done anything with kryo directly.
> 
> - Erik
> 
> On Thu, Feb 8, 2018 at 10:00 PM, P. Taylor Goetz 
> wrote:
> 
>> *serialized* ;)
>> 
>>> On Feb 9, 2018, at 12:48 AM, P. Taylor Goetz 
> wrote:
>>> 
>>> I’d have to check (can’t right now), but I think that class gets
>> sterilized via kryo. If that’s not the case, yes, it could cause
> problems.
>>> 
>>> I think the safest option would be to remove the serialversionuid.
>>> 
>>> -Taylor
>>> 
 On Feb 8, 2018, at 5:36 PM, Erik Weathers
> 
>> wrote:
 
 Something I just realized -- in the storm-kafka-client stomping into
 1.0.x-branch PR, I backported a change to Fields.java which added a
 serialVersionUID.

Re: [VOTE] Release Apache Storm 1.0.6 (rc3)

2018-02-09 Thread Bobby Evans
Actually it is set properly.  Do NOT change it.  It works just fine the way
it is.

$ git checkout f6f35dd98d2492a38aa4d61da7f6caee4ec2f31a # The git version
right before this change on branch-1.x
$ mvn clean install -DskipTests
$ serialver -classpath ./storm-core/target/storm-core-1.1.0-SNAPSHOT.jar
org.apache.storm.tuple.Fields
org.apache.storm.tuple.Fields:private static final long
serialVersionUID = -3377931843059975424L;

This is the same version that we set it to as a part of the change.

- Bobby

On Fri, Feb 9, 2018 at 3:00 PM Bobby Evans  wrote:

> Fields changing will cause problems if it is serialized as part of a
> bolt/spout or as part of a custom grouping.  I have not checked explicitly,
> but removing that line is the wrong thing to do.  By default the 
> serialVersionUID
> is generated from the class itself, so removing it, but leaving in the
> modified code would still break backwards compatibility.
>
> I'll take a look and see what you need to set it to so you don't break
> backwards compatibility on 1.x
>
> On Fri, Feb 9, 2018 at 10:19 AM P. Taylor Goetz  wrote:
>
>> What are others’ opinions on removing the serialversionUid an moving
>> ahead with an RC4?
>>
>> -Taylor
>>
>> > On Feb 9, 2018, at 7:21 AM, Jungtaek Lim  wrote:
>> >
>> > I just went ahead verifying current RC except serialization UID issue in
>> > Fields. I could also vote for RC4 immediately if necessary.
>> >
>> > +1 (binding)
>> >
>> >> source
>> >
>> > - verify file (signature, MD5, SHA)
>> > -- source, tar.gz : OK
>> > -- source, zip : OK
>> >
>> > - extract file
>> > -- source, tar.gz : OK
>> > -- source, zip : OK
>> >
>> > - diff-ing extracted files between tar.gz and zip : OK
>> >
>> > - build source with JDK 7
>> > -- source, tar.gz : OK
>> >
>> > - build source dist
>> > -- source, tar.gz : OK
>> >
>> > - build binary dist
>> > -- source, tar.gz : OK
>> >
>> >> binary
>> >
>> > - verify file (signature, MD5, SHA)
>> > -- binary, tar.gz : OK
>> > -- binary, zip : OK
>> >
>> > - extract file
>> > -- binary, tar.gz : OK
>> > -- binary, zip : OK
>> >
>> > - diff-ing extracted files between tar.gz and zip : OK
>> >
>> > - launch daemons : OK
>> >
>> > - run RollingTopWords (local) : OK
>> >
>> > - run RollingTopWords (remote) : OK
>> >  - activate / deactivate / rebalance / kill : OK
>> >  - logviewer (worker dir, daemon dir) : OK
>> >  - change log level : OK
>> >  - thread dump, heap dump, restart worker : OK
>> >  - log search : OK
>> >
>> > Thanks,
>> > Jungtaek Lim (HeartSaVioR)
>> >
>> > 2018년 2월 9일 (금) 오후 6:18, Erik Weathers 님이
>> 작성:
>> >
>> >> I'm fine submitting a PR to back that line out (or any of you committer
>> >> folks could just rip it out).
>> >>
>> >> But I'd like to understand Storm a bit better as part of making this
>> >> decision. :-)  Am I correct in assuming it would only be a problem if
>> the
>> >> serialized Fields were stored somewhere (e.g., ZooKeeper, local
>> filesystem)
>> >> and then read back in after the Nimbus/Workers are brought back up
>> after
>> >> the upgrade?  Seems Fields is used in a *lot* of places, and I don't
>> know
>> >> precisely what is serialized for reused upon Storm Nimbus/Worker daemon
>> >> restarts.  I believe there are examples of Fields being used to create
>> >> Spout or Bolt objects that are used to create the StormTopology object,
>> >> which I believe is serialized into ZooKeeper.  But I'm not clear if
>> it's
>> >> directly the Fields object itself or some kind of translation from that
>> >> into the thrift objects that make up StormTopology.
>> >>
>> >> I also don't know exactly when kryo is applicable in Storm.  I've never
>> >> done anything with kryo directly.
>> >>
>> >> - Erik
>> >>
>> >> On Thu, Feb 8, 2018 at 10:00 PM, P. Taylor Goetz 
>> >> wrote:
>> >>
>> >>> *serialized* ;)
>> >>>
>>  On Feb 9, 2018, at 12:48 AM, P. Taylor Goetz 
>> >> wrote:
>> 
>>  I’d have to check (can’t right now), but I think that class gets
>> >>> sterilized via kryo. If that’s not the case, yes, it could cause
>> >> problems.
>> 
>>  I think the safest option would be to remove the serialversionuid.
>> 
>>  -Taylor
>> 
>> > On Feb 8, 2018, at 5:36 PM, Erik Weathers
>> >> 
>> >>> wrote:
>> >
>> > Something I just realized -- in the storm-kafka-client stomping into
>> > 1.0.x-branch PR, I backported a change to Fields.java which added a
>> > serialVersionUID.
>> > Could that potentially break topologies when you upgrade storm-core
>> on
>> >>> the
>> > servers (nimbus, workers) from 1.0.{1..5} to 1.0.6?   I'm not super
>> > familiar with the serialization that occurs in Storm and whether
>> that
>> >>> could
>> > break people.
>> >
>> > https://github.com/apache/storm/pull/2550/files#diff-71a428d
>> >>> 508c4f5af0bfe3cc186e8edcf
>> >
>> 

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167286165
  
--- Diff: pom.xml ---
@@ -259,6 +259,7 @@
 1.11
 4.3.3
 0.2.4
+2.0.1
--- End diff --

Why does flux need jctools?  Shouldn't it come with storm-client?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167328981
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
 ---
@@ -44,14 +48,15 @@
 private final Boolean isEventLoggers;
 private final Boolean isDebug;
 private final RotatingMap pending;
+private TupleInfo globalTupleInfo = new TupleInfo();  // thread 
safety: assumes Collector.emit*() calls are externally synchronized (if needed).
--- End diff --

This feels like a good assumption for a spout, but I would like to 
understand the cost of making this thread safe (thread local instance etc), and 
at least document it if that cost is high, or preferably find a low cost 
solution to throw an exception if it does happen.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167286543
  
--- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
@@ -890,30 +871,91 @@
 public static final String TOPOLOGY_ISOLATED_MACHINES = 
"topology.isolate.machines";
 
 /**
- * Configure timeout milliseconds used for disruptor queue wait 
strategy. Can be used to tradeoff latency
- * vs. CPU usage
+ * Selects the Bolt's Wait Strategy to use when there are no incoming 
msgs. Used to trade off latency vs CPU usage.
+ */
+@isString
--- End diff --

Can we check if this is an instance of the proper parent interface?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167325679
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.daemon.worker;
+
+import org.apache.storm.Config;
+import org.apache.storm.messaging.TaskMessage;
+import org.apache.storm.policy.IWaitStrategy;
+import org.apache.storm.serialization.ITupleSerializer;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.utils.JCQueue;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.TransferDrainer;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.Utils.SmartThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+// Transfers messages destined to other workers
+class WorkerTransfer implements JCQueue.Consumer {
+static final Logger LOG = 
LoggerFactory.getLogger(WorkerTransfer.class);
+
+private final TransferDrainer drainer;
+private WorkerState workerState;
+private IWaitStrategy backPressureWaitStrategy;
+
+JCQueue transferQueue; // [remoteTaskId] -> JCQueue. Some entries 
maybe null (if no emits to those tasksIds from this worker)
+AtomicBoolean[] remoteBackPressureStatus; // [[remoteTaskId] -> 
true/false : indicates if remote task is under BP.
--- End diff --

Same here for package private.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167288058
  
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/Acker.java ---
@@ -99,17 +100,19 @@ public void execute(Tuple input) {
 pending.put(id, curr);
 } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
 resetTimeout = true;
-if (curr != null) {
+if (curr == null) {
--- End diff --

Why is this change being made?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167286681
  
--- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
@@ -890,30 +871,91 @@
 public static final String TOPOLOGY_ISOLATED_MACHINES = 
"topology.isolate.machines";
 
 /**
- * Configure timeout milliseconds used for disruptor queue wait 
strategy. Can be used to tradeoff latency
- * vs. CPU usage
+ * Selects the Bolt's Wait Strategy to use when there are no incoming 
msgs. Used to trade off latency vs CPU usage.
+ */
+@isString
+public static final String TOPOLOGY_BOLT_WAIT_STRATEGY = 
"topology.bolt.wait.strategy";
+
+/**
+ * Configures park time for WaitStrategyPark.  If set to 0, returns 
immediately (i.e busy wait).
  */
-@isInteger
 @NotNull
-public static final String 
TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS="topology.disruptor.wait.timeout.millis";
+@isPositiveNumber(includeZero = true)
+public static final String TOPOLOGY_BOLT_WAIT_PARK_MICROSEC = 
"topology.bolt.wait.park.microsec";
 
 /**
- * The number of tuples to batch before sending to the next thread.  
This number is just an initial suggestion and
- * the code may adjust it as your topology runs.
+ * Configures number of iterations to spend in level 1 of 
WaitStrategyProgressive, before progressing to level 2
  */
+@NotNull
 @isInteger
 @isPositiveNumber
+public static final String TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL1_COUNT 
=  "topology.bolt.wait.progressive.level1.count";
+
+/**
+ * Configures number of iterations to spend in level 2 of 
WaitStrategyProgressive, before progressing to level 3
+ */
 @NotNull
-public static final String 
TOPOLOGY_DISRUPTOR_BATCH_SIZE="topology.disruptor.batch.size";
+@isInteger
+@isPositiveNumber
+public static final String TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL2_COUNT 
=  "topology.bolt.wait.progressive.level2.count";
 
 /**
- * The maximum age in milliseconds a batch can be before being sent to 
the next thread.  This number is just an
- * initial suggestion and the code may adjust it as your topology runs.
+ * Configures sleep time for WaitStrategyProgressive.
  */
+@NotNull
+@isPositiveNumber(includeZero = true)
+public static final String 
TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS = 
"topology.bolt.wait.progressive.level3.sleep.millis";
+
+
+/**
+ * A class that implements a wait strategy for an upstream component 
(spout/bolt) trying to write to a downstream component
+ * whose recv queue is full
+ *
+ * 1. nextTuple emits no tuples
+ * 2. The spout has hit maxSpoutPending and can't emit any more tuples
+ */
+@isString
+public static final String 
TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY="topology.backpressure.wait.strategy";
--- End diff --

Here too if this is a class name it would be good to verify that it is an 
instance of a given class early on.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167325584
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.daemon.worker;
+
+import org.apache.storm.Config;
+import org.apache.storm.messaging.TaskMessage;
+import org.apache.storm.policy.IWaitStrategy;
+import org.apache.storm.serialization.ITupleSerializer;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.utils.JCQueue;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.TransferDrainer;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.Utils.SmartThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+// Transfers messages destined to other workers
+class WorkerTransfer implements JCQueue.Consumer {
+static final Logger LOG = 
LoggerFactory.getLogger(WorkerTransfer.class);
+
+private final TransferDrainer drainer;
+private WorkerState workerState;
+private IWaitStrategy backPressureWaitStrategy;
+
+JCQueue transferQueue; // [remoteTaskId] -> JCQueue. Some entries 
maybe null (if no emits to those tasksIds from this worker)
--- End diff --

nit why does this need to be package private?  Can we lock it down more?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167282875
  
--- Diff: docs/Concepts.md ---
@@ -113,3 +113,8 @@ Topologies execute across one or more worker processes. 
Each worker process is a
 **Resources:**
 
 * 
[Config.TOPOLOGY_WORKERS](javadocs/org/apache/storm/Config.html#TOPOLOGY_WORKERS):
 this config sets the number of workers to allocate for executing the topology
+
+### Performance Tuning
+
+Refer to [performance tuning guide](docs/CONTRIBUTING.md)
--- End diff --

I don't think `CONTRIBUTING.md` is the performance tuning guide, also the 
path is relative, and since `Performance.md` is in the same directory as this 
we don't need the `docs/` in the link


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167287026
  
--- Diff: storm-client/src/jvm/org/apache/storm/StormTimer.java ---
@@ -97,6 +97,8 @@ public void run() {
 // events.
 Time.sleep(1000);
 }
+if(Thread.interrupted())
+this.active.set(false);
--- End diff --

Nit can we wrap this in `'{'` and `'}'` and have a space after the `if` to 
match the style guide.


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167330523
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java 
---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Instances of this type are sent from NettyWorker to upstream 
WorkerTransfer to indicate BackPressure situation
+public class BackPressureStatus implements java.io.Serializable {
--- End diff --

Why does this need to be serializable if we are explicitly using kryo for 
all of the serialization?  Can we just register the class with kryo instead so 
this will also work when java serialization is disabled?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167331543
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchExecutor.java 
---
@@ -194,7 +194,12 @@ public void reportError(Throwable t) {
 public void emitDirect(int task, String stream, List 
values, Object id) {
 throw new UnsupportedOperationException("Trident does not 
support direct streams");
 }
-
+
+@Override
+public void flush() {
+//NOOP   //TODO: Roshan: validate if this is OK
--- End diff --

TODO needs to go away.  Is this OK to not have a flush?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167326878
  
--- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -313,107 +330,74 @@ public void metricsTick(Task taskData, TupleImpl 
tuple) {
 protected void setupMetrics() {
 for (final Integer interval : 
intervalToTaskToMetricToRegistry.keySet()) {
 StormTimer timerTask = workerData.getUserTimer();
-timerTask.scheduleRecurring(interval, interval, new Runnable() 
{
-@Override
-public void run() {
-TupleImpl tuple = new TupleImpl(workerTopologyContext, 
new Values(interval),
-(int) Constants.SYSTEM_TASK_ID, 
Constants.METRICS_TICK_STREAM_ID);
-List metricsTickTuple =
-Lists.newArrayList(new 
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
-receiveQueue.publish(metricsTickTuple);
+timerTask.scheduleRecurring(interval, interval,
+() -> {
+TupleImpl tuple = new TupleImpl(workerTopologyContext, 
new Values(interval), Constants.SYSTEM_COMPONENT_ID,
+(int) Constants.SYSTEM_TASK_ID, 
Constants.METRICS_TICK_STREAM_ID);
+AddressedTuple metricsTickTuple = new 
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
+try {
+receiveQueue.publish(metricsTickTuple);
+receiveQueue.flush();  // avoid buffering
+} catch (InterruptedException e) {
+LOG.warn("Thread interrupted when publishing 
metrics. Setting interrupt flag.");
+Thread.currentThread().interrupt();
+return;
+}
 }
-});
-}
-}
-
-public void sendUnanchored(Task task, String stream, List 
values, ExecutorTransfer transfer) {
-Tuple tuple = task.getTuple(stream, values);
-List tasks = task.getOutgoingTasks(stream, values);
-for (Integer t : tasks) {
-transfer.transfer(t, tuple);
-}
-}
-
-/**
- * Send sampled data to the eventlogger if the global or component 
level debug flag is set (via nimbus api).
- */
-public void sendToEventLogger(Executor executor, Task taskData, List 
values,
-  String componentId, Object messageId, 
Random random) {
-Map componentDebug = 
executor.getStormComponentDebug().get();
-DebugOptions debugOptions = componentDebug.get(componentId);
-if (debugOptions == null) {
-debugOptions = componentDebug.get(executor.getStormId());
-}
-double spct = ((debugOptions != null) && 
(debugOptions.is_enable())) ? debugOptions.get_samplingpct() : 0;
-if (spct > 0 && (random.nextDouble() * 100) < spct) {
-sendUnanchored(taskData, StormCommon.EVENTLOGGER_STREAM_ID,
-new Values(componentId, messageId, 
System.currentTimeMillis(), values),
-executor.getExecutorTransfer());
+);
 }
 }
 
-public void reflectNewLoadMapping(LoadMapping loadMapping) {
-for (LoadAwareCustomStreamGrouping g : groupers) {
-g.refreshLoad(loadMapping);
-}
-}
-
-private void registerBackpressure() {
-receiveQueue.registerBackpressureCallback(new 
DisruptorBackpressureCallback() {
-@Override
-public void highWaterMark() throws Exception {
-LOG.debug("executor " + executorId + " is congested, set 
backpressure flag true");
-
WorkerBackpressureThread.notifyBackpressureChecker(workerData.getBackpressureTrigger());
-}
-
-@Override
-public void lowWaterMark() throws Exception {
-LOG.debug("executor " + executorId + " is not-congested, 
set backpressure flag false");
-
WorkerBackpressureThread.notifyBackpressureChecker(workerData.getBackpressureTrigger());
-}
-});
-
receiveQueue.setHighWaterMark(ObjectReader.getDouble(topoConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
-
receiveQueue.setLowWaterMark(ObjectReader.getDouble(topoConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
-
receiveQueue.setEnableBackpressure(ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE),
 false));
-}
-
 protected void setupTicks(boolean 

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167293987
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java ---
@@ -456,137 +450,135 @@ public void refreshStormActive(Runnable callback) {
 }
 }
 
-public void refreshThrottle() {
-boolean backpressure = 
stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, 
this::refreshThrottle);
-this.throttleOn.set(backpressure);
-}
-
-private static double getQueueLoad(DisruptorQueue q) {
-DisruptorQueue.QueueMetrics qMetrics = q.getMetrics();
+private static double getQueueLoad(JCQueue q) {
+JCQueue.QueueMetrics qMetrics = q.getMetrics();
 return ((double) qMetrics.population()) / qMetrics.capacity();
 }
 
 public void refreshLoad(List execs) {
-Set remoteTasks = Sets.difference(new 
HashSet<>(outboundTasks), new HashSet<>(taskIds));
+Set remoteTasks = Sets.difference(new 
HashSet<>(outboundTasks), new HashSet<>(localTaskIds));
 Long now = System.currentTimeMillis();
 Map localLoad = new HashMap<>();
-for (IRunningExecutor exec: execs) {
+for (IRunningExecutor exec : execs) {
 double receiveLoad = getQueueLoad(exec.getReceiveQueue());
-double sendLoad = getQueueLoad(exec.getSendQueue());
-localLoad.put(exec.getExecutorId().get(0).intValue(), 
Math.max(receiveLoad, sendLoad));
+localLoad.put(exec.getExecutorId().get(0).intValue(), 
receiveLoad);
 }
 
 Map remoteLoad = new HashMap<>();
 cachedNodeToPortSocket.get().values().stream().forEach(conn -> 
remoteLoad.putAll(conn.getLoad(remoteTasks)));
 loadMapping.setLocal(localLoad);
 loadMapping.setRemote(remoteLoad);
 
-if (now > nextUpdate.get()) {
+if (now > nextLoadUpdate.get()) {
 receiver.sendLoadMetrics(localLoad);
-nextUpdate.set(now + LOAD_REFRESH_INTERVAL_MS);
+nextLoadUpdate.set(now + LOAD_REFRESH_INTERVAL_MS);
 }
 }
 
+// checks if the tasks which had back pressure are now free again. if 
so, sends an update to other workers
+public void refreshBackPressureStatus() {
+LOG.debug("Checking for change in Backpressure status on worker's 
tasks");
+boolean bpSituationChanged = bpTracker.refreshBpTaskList();
+if (bpSituationChanged) {
+BackPressureStatus bpStatus = bpTracker.getCurrStatus();
+receiver.sendBackPressureStatus(bpStatus);
+}
+}
+
+
 /**
  * we will wait all connections to be ready and then activate the 
spout/bolt
  * when the worker bootup.
  */
 public void activateWorkerWhenAllConnectionsReady() {
 int delaySecs = 0;
 int recurSecs = 1;
-refreshActiveTimer.schedule(delaySecs, new Runnable() {
-@Override public void run() {
+refreshActiveTimer.schedule(delaySecs,
+() -> {
 if (areAllConnectionsReady()) {
 LOG.info("All connections are ready for worker {}:{} 
with id {}", assignmentId, port, workerId);
 isWorkerActive.set(Boolean.TRUE);
 } else {
 refreshActiveTimer.schedule(recurSecs, () -> 
activateWorkerWhenAllConnectionsReady(), false, 0);
 }
 }
-});
+);
 }
 
 public void registerCallbacks() {
 LOG.info("Registering IConnectionCallbacks for {}:{}", 
assignmentId, port);
 receiver.registerRecv(new 
DeserializingConnectionCallback(topologyConf,
 getWorkerTopologyContext(),
-this::transferLocal));
+this::transferLocalBatch));
+// Send curr BackPressure status to new clients
+receiver.registerNewConnectionResponse(
+() -> {
+BackPressureStatus bpStatus = bpTracker.getCurrStatus();
+LOG.info("Sending BackPressure status to new client. 
BPStatus: {}", bpStatus);
+return bpStatus;
+}
+);
 }
 
-public void transferLocal(List tupleBatch) {
-Map grouped = new HashMap<>();
-for (AddressedTuple tuple : tupleBatch) {
-Integer executor = taskToShortExecutor.get(tuple.dest);
-if (null == executor) {
-LOG.warn("Received invalid messages for unknown tasks. 
Dropping... ");
-

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167287393
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java ---
@@ -40,109 +40,117 @@
 import org.apache.storm.nimbus.NimbusInfo;
 
 public interface IStormClusterState {
-public List assignments(Runnable callback);
+List assignments(Runnable callback);
 
-public Assignment assignmentInfo(String stormId, Runnable callback);
+Assignment assignmentInfo(String stormId, Runnable callback);
 
-public VersionedData assignmentInfoWithVersion(String 
stormId, Runnable callback);
+VersionedData assignmentInfoWithVersion(String stormId, 
Runnable callback);
 
-public Integer assignmentVersion(String stormId, Runnable callback) 
throws Exception;
+Integer assignmentVersion(String stormId, Runnable callback) throws 
Exception;
 
-public List blobstoreInfo(String blobKey);
+List blobstoreInfo(String blobKey);
 
-public List nimbuses();
+List nimbuses();
 
-public void addNimbusHost(String nimbusId, NimbusSummary 
nimbusSummary);
+void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
 
-public List activeStorms();
+List activeStorms();
 
 /**
  * Get a storm base for a topology
  * @param stormId the id of the topology
  * @param callback something to call if the data changes (best effort)
  * @return the StormBase or null if it is not alive.
  */
-public StormBase stormBase(String stormId, Runnable callback);
+StormBase stormBase(String stormId, Runnable callback);
 
-public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, 
String node, Long port);
+ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, 
Long port);
 
-public List getWorkerProfileRequests(String stormId, 
NodeInfo nodeInfo);
+List getWorkerProfileRequests(String stormId, NodeInfo 
nodeInfo);
 
-public List getTopologyProfileRequests(String stormId);
+List getTopologyProfileRequests(String stormId);
 
-public void setWorkerProfileRequest(String stormId, ProfileRequest 
profileRequest);
+void setWorkerProfileRequest(String stormId, ProfileRequest 
profileRequest);
 
-public void deleteTopologyProfileRequests(String stormId, 
ProfileRequest profileRequest);
+void deleteTopologyProfileRequests(String stormId, ProfileRequest 
profileRequest);
 
-public Map executorBeats(String stormId, 
Map executorNodePort);
+Map executorBeats(String stormId, 
Map executorNodePort);
 
-public List supervisors(Runnable callback);
+List supervisors(Runnable callback);
 
-public SupervisorInfo supervisorInfo(String supervisorId); // returns 
nil if doesn't exist
+SupervisorInfo supervisorInfo(String supervisorId); // returns nil if 
doesn't exist
 
-public void setupHeatbeats(String stormId);
+void setupHeatbeats(String stormId);
 
-public void teardownHeartbeats(String stormId);
+void teardownHeartbeats(String stormId);
 
-public void teardownTopologyErrors(String stormId);
+void teardownTopologyErrors(String stormId);
 
-public List heartbeatStorms();
+List heartbeatStorms();
 
-public List errorTopologies();
+List errorTopologies();
 
-public List backpressureTopologies();
+/** @deprecated: In Storm 2.0. Retained for enabling transition from 
1.x. Will be removed soon. */
--- End diff --

Could you file a JIRA for us to remove it in 3.x?


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167288382
  
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java ---
@@ -362,6 +362,8 @@ public static void addSystemStreams(StormTopology 
topology) {
 public static void addEventLogger(Map conf, 
StormTopology topology) {
 Integer numExecutors = 
ObjectReader.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS),
 ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
+if(numExecutors==null || numExecutors==0)
--- End diff --

nit can we fix the style here with a space after the if and '{' '}'


---


[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

2018-02-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2502#discussion_r167285848
  
--- Diff: 
flux/flux-core/src/test/java/org/apache/storm/flux/multilang/MultilangEnvironmentTest.java
 ---
@@ -18,6 +18,7 @@
 package org.apache.storm.flux.multilang;
 
 
+import org.junit.Ignore;
--- End diff --

nit: I don't think this is used in here, so can we remove it?


---


Re: [VOTE] Release Apache Storm 1.0.6 (rc3)

2018-02-09 Thread P. Taylor Goetz
What are others’ opinions on removing the serialversionUid an moving ahead with 
an RC4?

-Taylor

> On Feb 9, 2018, at 7:21 AM, Jungtaek Lim  wrote:
> 
> I just went ahead verifying current RC except serialization UID issue in
> Fields. I could also vote for RC4 immediately if necessary.
> 
> +1 (binding)
> 
>> source
> 
> - verify file (signature, MD5, SHA)
> -- source, tar.gz : OK
> -- source, zip : OK
> 
> - extract file
> -- source, tar.gz : OK
> -- source, zip : OK
> 
> - diff-ing extracted files between tar.gz and zip : OK
> 
> - build source with JDK 7
> -- source, tar.gz : OK
> 
> - build source dist
> -- source, tar.gz : OK
> 
> - build binary dist
> -- source, tar.gz : OK
> 
>> binary
> 
> - verify file (signature, MD5, SHA)
> -- binary, tar.gz : OK
> -- binary, zip : OK
> 
> - extract file
> -- binary, tar.gz : OK
> -- binary, zip : OK
> 
> - diff-ing extracted files between tar.gz and zip : OK
> 
> - launch daemons : OK
> 
> - run RollingTopWords (local) : OK
> 
> - run RollingTopWords (remote) : OK
>  - activate / deactivate / rebalance / kill : OK
>  - logviewer (worker dir, daemon dir) : OK
>  - change log level : OK
>  - thread dump, heap dump, restart worker : OK
>  - log search : OK
> 
> Thanks,
> Jungtaek Lim (HeartSaVioR)
> 
> 2018년 2월 9일 (금) 오후 6:18, Erik Weathers 님이 작성:
> 
>> I'm fine submitting a PR to back that line out (or any of you committer
>> folks could just rip it out).
>> 
>> But I'd like to understand Storm a bit better as part of making this
>> decision. :-)  Am I correct in assuming it would only be a problem if the
>> serialized Fields were stored somewhere (e.g., ZooKeeper, local filesystem)
>> and then read back in after the Nimbus/Workers are brought back up after
>> the upgrade?  Seems Fields is used in a *lot* of places, and I don't know
>> precisely what is serialized for reused upon Storm Nimbus/Worker daemon
>> restarts.  I believe there are examples of Fields being used to create
>> Spout or Bolt objects that are used to create the StormTopology object,
>> which I believe is serialized into ZooKeeper.  But I'm not clear if it's
>> directly the Fields object itself or some kind of translation from that
>> into the thrift objects that make up StormTopology.
>> 
>> I also don't know exactly when kryo is applicable in Storm.  I've never
>> done anything with kryo directly.
>> 
>> - Erik
>> 
>> On Thu, Feb 8, 2018 at 10:00 PM, P. Taylor Goetz 
>> wrote:
>> 
>>> *serialized* ;)
>>> 
 On Feb 9, 2018, at 12:48 AM, P. Taylor Goetz 
>> wrote:
 
 I’d have to check (can’t right now), but I think that class gets
>>> sterilized via kryo. If that’s not the case, yes, it could cause
>> problems.
 
 I think the safest option would be to remove the serialversionuid.
 
 -Taylor
 
> On Feb 8, 2018, at 5:36 PM, Erik Weathers
>> 
>>> wrote:
> 
> Something I just realized -- in the storm-kafka-client stomping into
> 1.0.x-branch PR, I backported a change to Fields.java which added a
> serialVersionUID.
> Could that potentially break topologies when you upgrade storm-core on
>>> the
> servers (nimbus, workers) from 1.0.{1..5} to 1.0.6?   I'm not super
> familiar with the serialization that occurs in Storm and whether that
>>> could
> break people.
> 
> https://github.com/apache/storm/pull/2550/files#diff-71a428d
>>> 508c4f5af0bfe3cc186e8edcf
> 
> - Erik
> 
>> On Thu, Feb 8, 2018 at 1:25 PM, Bobby Evans 
>>> wrote:
>> 
>> +1 I built the code from the git tag, ran all the unit tests (which
>>> passed
>> the first time), and ran some tests on a single node cluster.
>> 
>> It all looked good.
>> 
>> - Bobby
>> 
>>> On Thu, Feb 8, 2018 at 1:22 PM P. Taylor Goetz 
>>> wrote:
>>> 
>>> This is a call to vote on releasing Apache Storm 1.0.6 (rc3)
>>> 
>>> Full list of changes in this release:
>>> 
>>> 
>>> https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.
>> 0.6-rc3/RELEASE_NOTES.html
>>> 
>>> The tag/commit to be voted upon is v1.0.6:
>>> 
>>> 
>>> https://git-wip-us.apache.org/repos/asf?p=storm.git;a=tree;h=
>> e68365f9f947ddd1794b2edef2149fdfaa1590a2;hb=7993db01580ce62d
>>> 44866dc00e0a72
>> 66984638d0
>>> 
>>> The source archive being voted upon can be found here:
>>> 
>>> 
>>> https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.
>> 0.6-rc3/apache-storm-1.0.6-src.tar.gz
>>> 
>>> Other release files, signatures and digests can be found here:
>>> 
>>> 
>> https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.0.6-rc3/
>>> 
>>> The release artifacts are signed with the following key:
>>> 
>>> 
>>> https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_
>> 

Re: [DISCUSS] Decouple Storm core and connectors

2018-02-09 Thread Xin Wang
I agree with Jungtaek. The same case has happened again on RocketMQ.(
https://github.com/apache/storm/pull/2518)
The following is my advice.

1. Now storm has too many connectors, we can separate the first class
connectors from others.
The following is a possible list including all existing connectors.

First class:

   - Kafka,
   - HDFS,
   - HBase,
   - Hive,
   - Redis,
   - JDBC,
   - JMS



Others:

   - Solr,
   - Cassandra,
   - Elasticsearch,
   - Event Hubs
   - RocketMQ
   - MongoDB
   - OpenTSDB
   - Kinesis
   - Druid
   - MQTT,
   - PMML


2. For first class connectors we can leave the code as it is, but release
them independently;
for other connectors, I prefer to move them to Bahir like the way of
Spark/Flink.
We can have a communication with the Bahir community, and request to create
a https://github.com/apache/bahir-storm.git repo.



2018-02-01 9:10 GMT+08:00 P. Taylor Goetz :

> I’d start with Storm-Kafka-client as an experiment, and if that goes well,
> move all connectors to the same model.
>
> Some connectors are bound to a stable protocol (e.g. JMS, MQTT), some are
> bound to frequently changing APIs (e.g. Apache Kafka, cassandra, ES, etc.).
> The former tend to be stable in terms of usage patterns and use cases, the
> latter case case not so much. For example, consider hdfs integration. It’s
> changed a lot in response to different usage patterns. Kafka due to
> new/changing APIs. JMS hasn’t changed much at all since it’s tied to a
> stable API.
>
> There’s also the fact that a high percentage of connectors integrate with
> the most stable Storm APIs (spout, bolt, trident). The volatile (using the
> term loosely) parts of our API affect projects like Mesos and streamparse,
> but not the connectors we sponsor.
>
> -Taylor
>
> > On Jan 31, 2018, at 7:07 PM, Roshan Naik  wrote:
> >
> > I was thinking if the any connector is released more frequently, their
> quality would be more mature and typically have lower impact on a Storm
> release (compared to now) … if we decide to bundle them in Storm as well.
> > -roshan
> >
> >
> > On 1/31/18, 4:02 PM, "P. Taylor Goetz"  wrote:
> >
> >I think we all agree that releasing connectors as part of a Storm
> release hinders the frequency of the release cycle for both Storm proper,
> as well as connectors.
> >
> >If that’s the case, then the question is how to proceed.
> >
> >-Taylor
> >
> >> On Jan 31, 2018, at 6:46 PM, Roshan Naik 
> wrote:
> >>
> >> One thought is to …
> >> - do a frequent separate release
> >> - *and also* include the latest stuff along with each Storm release.
> >>
> >> -roshan
> >>
> >>
> >> On 1/31/18, 10:43 AM, "generalbas@gmail.com on behalf of Stig
> Rohde Døssing"  stigdoess...@gmail.com> wrote:
> >>
> >>   Hugo,
> >>   It's not my impression that anyone is complaining that
> storm-kafka-client
> >>   has been exceptionally buggy, or that we haven't been fixing the
> issues as
> >>   they crop up. The problem is that we're sitting on the fixes for way
> longer
> >>   than is reasonable, and even if we release Storm more often, users
> have to
> >>   go out of their way to know that they should really be using the
> latest
> >>   storm-kafka-client rather than the one that ships with their Storm
> >>   installation, because the version number of storm-kafka-client
> happens to
> >>   not mean anything regarding compatibility with Storm.
> >>
> >>   Everyone,
> >>
> >>   Most of what I've written here has already been said, but I've already
> >>   written it so...
> >>
> >>   I really don't see the point in going through the effort of separating
> >>   connectors out to another repository if we're just going to make the
> other
> >>   repository the second class citizen connector graveyard.
> >>
> >>   The point to separating storm-kafka-client out is so it can get a
> release
> >>   cycle different from Storm, so we can avoid the situation we're in
> now in
> >>   the future. There's obviously a flaw in our process when we have to
> choose
> >>   between breaking semantic versioning and releasing broken software.
> >>
> >>   I agree that it would be good to release Storm a little more often,
> but I
> >>   don't think that fully addresses my concerns. Are we willing to
> increment
> >>   Storm's major version number if a connector needs to break its API
> (e.g. as
> >>   I want to do in https://github.com/apache/storm/pull/2300)?
> >>
> >>   I think a key observation is that Storm's core API is extremely
> stable.
> >>   Storm and the connectors aren't usually tightly coupled in the sense
> that
> >>   e.g. version 1.0.2 of storm-kafka-client would only work with Storm
> 1.0.2
> >>   and not 1.0.0, so in many cases there's no reason you wouldn't use the
> >>   latest connector version instead of the one that happens to ship with
> the
> >>   version of Storm you're using. I think 

[GitHub] storm pull request #2551: [STORM-2940] Dynamically set the CAPACITY value of...

2018-02-09 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2551#discussion_r167247297
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java ---
@@ -85,6 +85,7 @@ public void prepare(WorkerTopologyContext context, 
GlobalStreamId stream, List

[GitHub] storm issue #2554: STORM-2939 add WorkerMetricsProcessor interface

2018-02-09 Thread agresch
Github user agresch commented on the issue:

https://github.com/apache/storm/pull/2554
  
1) My intent was that the metricstore would be for insertion, and the 
metricprocessor for either forwarding the metrics to the metricstore or just 
extending a metricstore and doing the metric insertion in place.  The HBase 
metricprocessor implementation I'm working on does the latter, inserting 
directly from the Supervisor.

2) I do not have a plan for this yet, but we plan on tackling this.  We're 
planning on getting the metrics forwarded from the workers to the Supervisor 
and inserting from HBase there (or forwarding to Nimbus for RocksDB).  In the 
non-HBase implementation, we need to be able to have a path from the workers to 
Nimbus.


---


Re: [VOTE] Release Apache Storm 1.0.6 (rc3)

2018-02-09 Thread Jungtaek Lim
I just went ahead verifying current RC except serialization UID issue in
Fields. I could also vote for RC4 immediately if necessary.

+1 (binding)

> source

- verify file (signature, MD5, SHA)
-- source, tar.gz : OK
-- source, zip : OK

- extract file
-- source, tar.gz : OK
-- source, zip : OK

- diff-ing extracted files between tar.gz and zip : OK

- build source with JDK 7
-- source, tar.gz : OK

- build source dist
-- source, tar.gz : OK

- build binary dist
-- source, tar.gz : OK

> binary

- verify file (signature, MD5, SHA)
-- binary, tar.gz : OK
-- binary, zip : OK

- extract file
-- binary, tar.gz : OK
-- binary, zip : OK

- diff-ing extracted files between tar.gz and zip : OK

- launch daemons : OK

- run RollingTopWords (local) : OK

- run RollingTopWords (remote) : OK
  - activate / deactivate / rebalance / kill : OK
  - logviewer (worker dir, daemon dir) : OK
  - change log level : OK
  - thread dump, heap dump, restart worker : OK
  - log search : OK

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 2월 9일 (금) 오후 6:18, Erik Weathers 님이 작성:

> I'm fine submitting a PR to back that line out (or any of you committer
> folks could just rip it out).
>
> But I'd like to understand Storm a bit better as part of making this
> decision. :-)  Am I correct in assuming it would only be a problem if the
> serialized Fields were stored somewhere (e.g., ZooKeeper, local filesystem)
> and then read back in after the Nimbus/Workers are brought back up after
> the upgrade?  Seems Fields is used in a *lot* of places, and I don't know
> precisely what is serialized for reused upon Storm Nimbus/Worker daemon
> restarts.  I believe there are examples of Fields being used to create
> Spout or Bolt objects that are used to create the StormTopology object,
> which I believe is serialized into ZooKeeper.  But I'm not clear if it's
> directly the Fields object itself or some kind of translation from that
> into the thrift objects that make up StormTopology.
>
> I also don't know exactly when kryo is applicable in Storm.  I've never
> done anything with kryo directly.
>
> - Erik
>
> On Thu, Feb 8, 2018 at 10:00 PM, P. Taylor Goetz 
> wrote:
>
> > *serialized* ;)
> >
> > > On Feb 9, 2018, at 12:48 AM, P. Taylor Goetz 
> wrote:
> > >
> > > I’d have to check (can’t right now), but I think that class gets
> > sterilized via kryo. If that’s not the case, yes, it could cause
> problems.
> > >
> > > I think the safest option would be to remove the serialversionuid.
> > >
> > > -Taylor
> > >
> > >> On Feb 8, 2018, at 5:36 PM, Erik Weathers
> 
> > wrote:
> > >>
> > >> Something I just realized -- in the storm-kafka-client stomping into
> > >> 1.0.x-branch PR, I backported a change to Fields.java which added a
> > >> serialVersionUID.
> > >> Could that potentially break topologies when you upgrade storm-core on
> > the
> > >> servers (nimbus, workers) from 1.0.{1..5} to 1.0.6?   I'm not super
> > >> familiar with the serialization that occurs in Storm and whether that
> > could
> > >> break people.
> > >>
> > >> https://github.com/apache/storm/pull/2550/files#diff-71a428d
> > 508c4f5af0bfe3cc186e8edcf
> > >>
> > >> - Erik
> > >>
> > >>> On Thu, Feb 8, 2018 at 1:25 PM, Bobby Evans 
> > wrote:
> > >>>
> > >>> +1 I built the code from the git tag, ran all the unit tests (which
> > passed
> > >>> the first time), and ran some tests on a single node cluster.
> > >>>
> > >>> It all looked good.
> > >>>
> > >>> - Bobby
> > >>>
> >  On Thu, Feb 8, 2018 at 1:22 PM P. Taylor Goetz 
> > wrote:
> > 
> >  This is a call to vote on releasing Apache Storm 1.0.6 (rc3)
> > 
> >  Full list of changes in this release:
> > 
> > 
> >  https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.
> > >>> 0.6-rc3/RELEASE_NOTES.html
> > 
> >  The tag/commit to be voted upon is v1.0.6:
> > 
> > 
> >  https://git-wip-us.apache.org/repos/asf?p=storm.git;a=tree;h=
> > >>> e68365f9f947ddd1794b2edef2149fdfaa1590a2;hb=7993db01580ce62d
> > 44866dc00e0a72
> > >>> 66984638d0
> > 
> >  The source archive being voted upon can be found here:
> > 
> > 
> >  https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.
> > >>> 0.6-rc3/apache-storm-1.0.6-src.tar.gz
> > 
> >  Other release files, signatures and digests can be found here:
> > 
> > 
> https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.0.6-rc3/
> > 
> >  The release artifacts are signed with the following key:
> > 
> > 
> >  https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_
> > >>> plain;f=KEYS;hb=22b832708295fa2c15c4f3c70ac0d2bc6fded4bd
> > 
> >  The Nexus staging repository for this release is:
> > 
> >  https://repository.apache.org/content/repositories/orgapache
> > storm-1060
> > 
> >  Please vote on releasing this 

Re: [VOTE] Release Apache Storm 1.0.6 (rc3)

2018-02-09 Thread Erik Weathers
I'm fine submitting a PR to back that line out (or any of you committer
folks could just rip it out).

But I'd like to understand Storm a bit better as part of making this
decision. :-)  Am I correct in assuming it would only be a problem if the
serialized Fields were stored somewhere (e.g., ZooKeeper, local filesystem)
and then read back in after the Nimbus/Workers are brought back up after
the upgrade?  Seems Fields is used in a *lot* of places, and I don't know
precisely what is serialized for reused upon Storm Nimbus/Worker daemon
restarts.  I believe there are examples of Fields being used to create
Spout or Bolt objects that are used to create the StormTopology object,
which I believe is serialized into ZooKeeper.  But I'm not clear if it's
directly the Fields object itself or some kind of translation from that
into the thrift objects that make up StormTopology.

I also don't know exactly when kryo is applicable in Storm.  I've never
done anything with kryo directly.

- Erik

On Thu, Feb 8, 2018 at 10:00 PM, P. Taylor Goetz  wrote:

> *serialized* ;)
>
> > On Feb 9, 2018, at 12:48 AM, P. Taylor Goetz  wrote:
> >
> > I’d have to check (can’t right now), but I think that class gets
> sterilized via kryo. If that’s not the case, yes, it could cause problems.
> >
> > I think the safest option would be to remove the serialversionuid.
> >
> > -Taylor
> >
> >> On Feb 8, 2018, at 5:36 PM, Erik Weathers 
> wrote:
> >>
> >> Something I just realized -- in the storm-kafka-client stomping into
> >> 1.0.x-branch PR, I backported a change to Fields.java which added a
> >> serialVersionUID.
> >> Could that potentially break topologies when you upgrade storm-core on
> the
> >> servers (nimbus, workers) from 1.0.{1..5} to 1.0.6?   I'm not super
> >> familiar with the serialization that occurs in Storm and whether that
> could
> >> break people.
> >>
> >> https://github.com/apache/storm/pull/2550/files#diff-71a428d
> 508c4f5af0bfe3cc186e8edcf
> >>
> >> - Erik
> >>
> >>> On Thu, Feb 8, 2018 at 1:25 PM, Bobby Evans 
> wrote:
> >>>
> >>> +1 I built the code from the git tag, ran all the unit tests (which
> passed
> >>> the first time), and ran some tests on a single node cluster.
> >>>
> >>> It all looked good.
> >>>
> >>> - Bobby
> >>>
>  On Thu, Feb 8, 2018 at 1:22 PM P. Taylor Goetz 
> wrote:
> 
>  This is a call to vote on releasing Apache Storm 1.0.6 (rc3)
> 
>  Full list of changes in this release:
> 
> 
>  https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.
> >>> 0.6-rc3/RELEASE_NOTES.html
> 
>  The tag/commit to be voted upon is v1.0.6:
> 
> 
>  https://git-wip-us.apache.org/repos/asf?p=storm.git;a=tree;h=
> >>> e68365f9f947ddd1794b2edef2149fdfaa1590a2;hb=7993db01580ce62d
> 44866dc00e0a72
> >>> 66984638d0
> 
>  The source archive being voted upon can be found here:
> 
> 
>  https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.
> >>> 0.6-rc3/apache-storm-1.0.6-src.tar.gz
> 
>  Other release files, signatures and digests can be found here:
> 
>  https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.0.6-rc3/
> 
>  The release artifacts are signed with the following key:
> 
> 
>  https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_
> >>> plain;f=KEYS;hb=22b832708295fa2c15c4f3c70ac0d2bc6fded4bd
> 
>  The Nexus staging repository for this release is:
> 
>  https://repository.apache.org/content/repositories/orgapache
> storm-1060
> 
>  Please vote on releasing this package as Apache Storm 1.0.6.
> 
>  When voting, please list the actions taken to verify the release.
> 
>  This vote will be open for at least 72 hours.
> 
>  [ ] +1 Release this package as Apache Storm 1.0.6
>  [ ]  0 No opinion
>  [ ] -1 Do not release this package because...
> 
>  Thanks to everyone who contributed to this release.
> 
>  -Taylor
> >>>
>


Re: [VOTE] Release Apache Storm 1.1.2 (rc3)

2018-02-09 Thread Jungtaek Lim
+1 (binding)

> source

- verify file (signature, MD5, SHA)
-- source, tar.gz : OK
-- source, zip : OK

- extract file
-- source, tar.gz : OK
-- source, zip : OK

- diff-ing extracted files between tar.gz and zip : OK

- build source with JDK 7 (without all-tests flag: integration tests are
failing with high chance, hence relying on Bobby's verification)
-- source, tar.gz : OK

- build source dist
-- source, tar.gz : OK

- build binary dist
-- source, tar.gz : OK

> binary

- verify file (signature, MD5, SHA)
-- binary, tar.gz : OK
-- binary, zip : OK

- extract file
-- binary, tar.gz : OK
-- binary, zip : OK

- diff-ing extracted files between tar.gz and zip : OK

- launch daemons : OK

- run RollingTopWords (local) : OK

- run RollingTopWords (remote) : OK
  - activate / deactivate / rebalance / kill : OK
  - logviewer (worker dir, daemon dir) : OK
  - change log level : OK
  - thread dump, heap dump, restart worker : OK
  - log search : OK

Thanks,
Jungtaek Lim (HeartSaVioR)


2018년 2월 9일 (금) 오전 5:59, Bobby Evans 님이 작성:

> +1 I built from the tag and did some basic tests on a single node cluster.
>
> I am disappointed that it took me 7 times to get all of the unit tests to
> pass for the build, but I don't think it should block a release.
>
> - Bobby
>
> On Thu, Feb 8, 2018 at 11:48 AM P. Taylor Goetz  wrote:
>
> > This is a call to vote on releasing Apache Storm 1.1.2 (rc3)
> >
> > Full list of changes in this release:
> >
> >
> >
> https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.1.2-rc3/RELEASE_NOTES.html
> >
> > The tag/commit to be voted upon is v1.1.2:
> >
> >
> >
> https://git-wip-us.apache.org/repos/asf?p=storm.git;a=tree;h=3aab00d053d3576d3f8a37bcc3c7e51072ead22e;hb=0bb7a66a9b26ad96afc81b27dd45f93ae8969c44
> >
> > The source archive being voted upon can be found here:
> >
> >
> >
> https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.1.2-rc3/apache-storm-1.1.2-src.tar.gz
> >
> > Other release files, signatures and digests can be found here:
> >
> > https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.1.2-rc3/
> >
> > The release artifacts are signed with the following key:
> >
> >
> >
> https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_plain;f=KEYS;hb=22b832708295fa2c15c4f3c70ac0d2bc6fded4bd
> >
> > The Nexus staging repository for this release is:
> >
> > https://repository.apache.org/content/repositories/orgapachestorm-1059
> >
> > Please vote on releasing this package as Apache Storm 1.1.2.
> >
> > When voting, please list the actions taken to verify the release.
> >
> > This vote will be open for at least 72 hours.
> >
> > [ ] +1 Release this package as Apache Storm 1.1.2
> > [ ]  0 No opinion
> > [ ] -1 Do not release this package because...
> >
> > Thanks to everyone who contributed to this release.
> >
> > -Taylor
> >
>


Re: [VOTE] Release Apache Storm 1.2.0 (rc4)

2018-02-09 Thread Alexandre Vermeerbergen
Hello,

For your information, yesterday I upgraded our preproduction cluster
from Storm 1.2.0 SNAPSHOT of December 2017 to 1.2.0 Release Candidate
4.
Our alerting system based on Storm behaved quite bad with 1.2.0 RC4,
and this morning, one of our team member noticed this message in our
topologie's startup logs:

1279 [main] WARN  o.a.s.k.s.KafkaSpoutConfig - The KafkaConsumer
enable.auto.commit setting is not supported. You can configure similar
behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee.This
will be treated as an error in the next major release.

1279 [main] INFO  o.a.s.k.s.KafkaSpoutConfig - Setting Kafka consumer
property 'enable.auto.commit' to 'false', because the spout does not
support auto-commit


So I will resume our tests with this new setting and I will feedback
once I have enought uptime on our "large preproduction cluster" with
1.2.0 RC4.

Note: may I suggest to make this breaking change visible in Storm
1.2.0 releases notes? this is quite impacting. Or event better: make
the topologies unable to start when they use such a removed property,
so that at least people aren't fooled until they wonder why their
Kafka spouts aren't anymore behaving like before and check logs?

More to come when my preproduction tests will have been completed (1or
2 days needed).

Best regards,
Alexandre

2018-02-07 23:24 GMT+01:00 P. Taylor Goetz :
> This is a call to vote on releasing Apache Storm 1.2.0 (rc4)
>
> Note that the only difference between rc4 and rc3 is the fix for
> https://issues.apache.org/jira/browse/STORM-2942.
>
> Full list of changes in this release:
>
> https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.2.0-rc4/RELEASE_NOTES.html
>
> The tag/commit to be voted upon is v1.2.0:
>
> https://git-wip-us.apache.org/repos/asf?p=storm.git;a=tree;h=cef4d49e222e53656f38c40d754d4f41799cd9a9;hb=2a0097f9a20b9df494caadb87c87d4e4db01a7ed
>
> The source archive being voted upon can be found here:
>
> https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.2.0-rc4/apache-storm-1.2.0-src.tar.gz
>
> Other release files, signatures and digests can be found here:
>
> https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.2.0-rc4/
>
> The release artifacts are signed with the following key:
>
> https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_plain;f=KEYS;hb=22b832708295fa2c15c4f3c70ac0d2bc6fded4bd
>
> The Nexus staging repository for this release is:
>
> https://repository.apache.org/content/repositories/orgapachestorm-1058
>
> Please vote on releasing this package as Apache Storm 1.2.0.
>
> When voting, please list the actions taken to verify the release.
>
> This vote will be open for at least 72 hours.
>
> [ ] +1 Release this package as Apache Storm 1.2.0
> [ ]  0 No opinion
> [ ] -1 Do not release this package because...
>
> Thanks to everyone who contributed to this release.
>
> -Taylor