Re: [VOTE] Release Apache Storm 1.0.5 (rc1)

2017-09-13 Thread P. Taylor Goetz
This vote is now closed and passes with 3 binding +1 votes, 1 non-binding +1 
vote, and one non-binding 0 vote.

Vote tally (* denotes a binding vote):

+1:
Jungtaek Lim*
Stig Rohde Døssing*
Priank Shah
P. Taylor Goetz*

+0:
Alexandre Vermeerbergen 

Thanks to everyone who took the time to evaluate and vote on this release 
candidate.

I will release the artifacts from staging and announce the release after the 24 
hour waiting period.

-Taylor

> On Sep 6, 2017, at 2:29 PM, P. Taylor Goetz  wrote:
> 
> This is a call to vote on releasing Apache Storm 1.0.5 (rc1)
> 
> Full list of changes in this release:
> 
> https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.0.5-rc1/RELEASE_NOTES.html
> 
> The tag/commit to be voted upon is v1.0.5:
> 
> https://git-wip-us.apache.org/repos/asf?p=storm.git;a=tree;h=239fd1522114c4928ed99b4ab5bcbf027540d5e7;hb=a445446e46e2228fabb83368cc25cf5422a2c400
> 
> The source archive being voted upon can be found here:
> 
> https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.0.5-rc1/apache-storm-1.0.5-src.tar.gz
> 
> Other release files, signatures and digests can be found here:
> 
> https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.0.5-rc1/
> 
> The release artifacts are signed with the following key:
> 
> https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_plain;f=KEYS;hb=22b832708295fa2c15c4f3c70ac0d2bc6fded4bd
> 
> The Nexus staging repository for this release is:
> 
> https://repository.apache.org/content/repositories/orgapachestorm-1051
> 
> Please vote on releasing this package as Apache Storm 1.0.5.
> 
> When voting, please list the actions taken to verify the release.
> 
> This vote will be open for at least 72 hours.
> 
> [ ] +1 Release this package as Apache Storm 1.0.5
> [ ]  0 No opinion
> [ ] -1 Do not release this package because...
> 
> Thanks to everyone who contributed to this release.
> 
> -Taylor



Re: [VOTE] Release Apache Storm 1.0.5 (rc1)

2017-09-13 Thread P. Taylor Goetz
Thanks for the bump Jungtaek. I know my +1 vote would take this vote to the 
necessary count, but my hope was that other PMC members would vote as well (I 
like to see at least 3 binding +1s outside of the RM’s vote). Nonetheless…

+1 (binding)
Setup a multi-node cluster and rand my usual suite of semi-automatic tests.

-Taylor


> On Sep 12, 2017, at 8:42 PM, Jungtaek Lim  wrote:
> 
> Kindly reminder: there're 2 binding +1s, 1 non-binding +1, 1 non-binding 0.
> To finish the vote, at least one more binding +1 needed. Could any PMC
> members please participate?
> 
> 2017년 9월 9일 (토) 오전 9:47, Priyank Shah 님이 작성:
> 
>> +1 (non-binding)
>> 
>> Verified md5 and sha
>> Successfully extracted source and distributions
>> Built source “mvn clean install” successfully
>> Launched  daemons(nimbus, supervisor, ui, logviewer) successfully from
>> storm-dist after packaging
>> Ran FastWordCountTopology
>> Played around with UI
>> Checked worker logs and other daemon logs
>> Brought down the daemons without issues
>> 
>> 
>> 
>> On 9/7/17, 11:45 AM, "generalbas@gmail.com on behalf of Stig Rohde
>> Døssing" 
>> wrote:
>> 
>>* Extracted binary zip and built storm-starter using the included
>> example
>>code and the Nexus staging repo.
>>* After working around
>> https://issues.apache.org/jira/browse/STORM-2451 (I
>>forgot to pull it into 1.0.x after the 1.0.4 release, and don't want to
>>delay this release for that issue), started Nimbus, one supervisor and
>> UI
>>daemons.
>>* Verified storm-local is no longer relative to the directory where
>> storm
>>nimbus is executed.
>>* Deployed WordCountTopology and verified no errors in the worker
>> logs, and
>>that the topology produces word counts in the log.
>>* Clicked around a bit in UI for the topology and components
>>* Killed the topology.
>> 
>>+1 (binding)
>> 
>>2017-09-07 10:56 GMT+02:00 Alexandre Vermeerbergen <
>> avermeerber...@gmail.com
>>> :
>> 
>>> Hello,
>>> 
>>> (non binding)
>>> 
>>> [x]  0 No opinion
>>> 
>>> => we are currently running with 1.1.0, waiting for include
>>> https://issues.apache.org/jira/browse/STORM-2648 fix was
>> unfortunately
>>> shifted in 1.2.0 (although lack of acks for KafkaSpout when it's
>> running in
>>> autocommit mode sounds more like a bug than a feature-thing), so we
>> skip
>>> testing 1.0.5
>>> 
>>> Best regards,
>>> Alexandre Vermeerbergen
>>> 
>>> 
>>> 2017-09-07 2:38 GMT+02:00 Jungtaek Lim :
>>> 
 +1 (binding)
 
> source
 
 - verify file (signature, MD5, SHA)
 -- source, tar.gz : OK
 -- source, zip : OK
 
 - extract file
 -- source, tar.gz : OK
 -- source, zip : OK
 
 - diff-ing extracted files between tar.gz and zip : OK
 
 - build source with JDK 7
 -- source, tar.gz : OK
 
 - build source dist
 -- source, tar.gz : OK
 
 - build binary dist
 -- source, tar.gz : OK
 
> binary
 
 - verify file (signature, MD5, SHA)
 -- binary, tar.gz : OK
 -- binary, zip : OK
 
 - extract file
 -- source, tar.gz : OK
 -- source, zip : OK
 
 - diff-ing extracted files between tar.gz and zip : OK
 
 - launch daemons : OK
 
 - run RollingTopWords (local) : OK
 
 - run RollingTopWords (remote) : OK
  - activate / deactivate / rebalance / kill : OK
  - logviewer (worker dir, daemon dir) : OK
  - change log level : OK
  - thread dump, heap dump, restart worker :
  - log search : OK
 
 Most critical issue in 1.0.5 has been verified: upgrade from Storm
>> 1.0.4
>>> to
 Storm 1.0.5 RC1 succeeds, though I've not tried RU. I tested this
>> via
 shutting down 1.0.4 cluster and starting 1.0.5 cluster which two
>> clusters
 are sharing storm-local directory and ZK path.
 
 Thanks,
 Jungtaek Lim (HeartSaVioR)
 
 2017년 9월 7일 (목) 오전 3:29, P. Taylor Goetz 님이 작성:
 
> This is a call to vote on releasing Apache Storm 1.0.5 (rc1)
> 
> Full list of changes in this release:
> 
> 
> https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.
 0.5-rc1/RELEASE_NOTES.html
> 
> The tag/commit to be voted upon is v1.0.5:
> 
> 
> https://git-wip-us.apache.org/repos/asf?p=storm.git;a=tree;h=
 239fd1522114c4928ed99b4ab5bcbf027540d5e7;hb=
>>> a445446e46e2228fabb83368cc25cf
 5422a2c400
> 
> The source archive being voted upon can be found here:
> 
> 
> https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.
 0.5-rc1/apache-storm-1.0.5-src.tar.gz
> 
> Other release files, signatures and digests can be found here:
> 
> 
>> https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.0.5-rc1/
> 
> The release artifacts are signed with the following key:

[GitHub] storm issue #2319: [STORM-2693] Nimbus assignments promotion

2017-09-13 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/2319
  
@danny0405 I am not concerned about the memory for the cache, because we 
are already keeping all of that in memory while we schedule the topologies 
anyways.  If it does prove to be an issue we can address it then.


---


[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

2017-09-13 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2319#discussion_r138657756
  
--- Diff: storm-core/src/clj/org/apache/storm/cluster.clj ---
@@ -123,6 +133,7 @@
 (def SUPERVISORS-SUBTREE (str "/" SUPERVISORS-ROOT))
 (def WORKERBEATS-SUBTREE (str "/" WORKERBEATS-ROOT))
 (def BACKPRESSURE-SUBTREE (str "/" BACKPRESSURE-ROOT))
+(def LEADERINFO-SUBTREE (str "/" LEADERINFO-ROOT))
--- End diff --

Is this needed?  we already have a way of getting the leader that is built 
into the client.  Why do we need a second way?


---


[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

2017-09-13 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2319#discussion_r138662036
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/assignments/AssignmentDistributionService.java
 ---
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.assignments;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.SupervisorAssignments;
+import org.apache.storm.utils.SupervisorClient;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A service for distributing master assignments to supervisors, this 
service makes the assignments notification asynchronous.
+ * We support multiple working threads to distribute assignment, every 
thread has a queue buffer.
+ * Master will shuffle its node request to the queues, if the target 
queue is full, we just discard the request, let the supervisors sync instead.
+ * 
+ * {@code
+ * Working mode
+ *  ++ +-+
+ *  | queue1 |   ==>   | Working thread1 |
+ * ++ shuffle   ++ +-+
+ * | Master |   ==>
+ * ++   ++ +-+
+ *  | queue2 |   ==>   | Working thread2 |
+ *  ++ +-+
+ * }
+ * 
+ */
+public class AssignmentDistributionService implements Closeable {
+private static final Logger LOG = 
LoggerFactory.getLogger(AssignmentDistributionService.class);
+private ExecutorService service;
+
+/**
+ * Flag to indicate if the service is active
+ */
+private volatile boolean active = false;
+
+private Random random;
+/**
+ * Working threads num.
+ */
+private int threadsNum = 0;
+/**
+ * Working thread queue size.
+ */
+private int queueSize = 0;
+
+/**
+ * Assignments request queue.
+ */
+private volatile Map 
assignmentsQueue;
+
+private Map conf;
+
+/**
+ * Function for initialization.
+ *
+ * @param conf
+ */
+public void prepare(Map conf) {
+this.conf = conf;
+this.random = new Random(47);
+
+this.threadsNum = 
Utils.getInt(conf.get(Config.NIMBUS_ASSIGNMENTS_SERVICE_THREADS), 10);
+this.queueSize = 
Utils.getInt(conf.get(Config.NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE), 
100);
+
+this.assignmentsQueue = new HashMap<>();
+for (int i = 0; i < threadsNum; i++) {
+this.assignmentsQueue.put(i, new 
LinkedBlockingQueue(queueSize));
+}
+//start the thread pool
+this.service = Executors.newFixedThreadPool(threadsNum);
+this.active = true;
+//start the threads
+for (int i = 0; i < threadsNum; i++) {
+this.service.submit(new DistributeTask(this, i));
+}
+}
+
+@Override
+public void close() throws IOException {
+this.active = false;
+this.service.shutdownNow();
+try {
+this.service.awaitTermination(10l, TimeUnit.SECONDS);
+} catch (InterruptedException e) {
+LOG.error("Failed to close assignments distribute service");
+}
+this.assignmentsQueue = null;
+}
+
+public void addAssignmentsForNode(String node, SupervisorAssignments 

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

2017-09-13 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2319#discussion_r138663085
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/assignments/AssignmentDistributionService.java
 ---
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.assignments;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.SupervisorAssignments;
+import org.apache.storm.utils.SupervisorClient;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A service for distributing master assignments to supervisors, this 
service makes the assignments notification asynchronous.
+ * We support multiple working threads to distribute assignment, every 
thread has a queue buffer.
+ * Master will shuffle its node request to the queues, if the target 
queue is full, we just discard the request, let the supervisors sync instead.
+ * 
+ * {@code
+ * Working mode
+ *  ++ +-+
+ *  | queue1 |   ==>   | Working thread1 |
+ * ++ shuffle   ++ +-+
+ * | Master |   ==>
+ * ++   ++ +-+
+ *  | queue2 |   ==>   | Working thread2 |
+ *  ++ +-+
+ * }
+ * 
+ */
+public class AssignmentDistributionService implements Closeable {
+private static final Logger LOG = 
LoggerFactory.getLogger(AssignmentDistributionService.class);
+private ExecutorService service;
+
+/**
+ * Flag to indicate if the service is active
+ */
+private volatile boolean active = false;
+
+private Random random;
+/**
+ * Working threads num.
+ */
+private int threadsNum = 0;
+/**
+ * Working thread queue size.
+ */
+private int queueSize = 0;
+
+/**
+ * Assignments request queue.
+ */
+private volatile Map 
assignmentsQueue;
+
+private Map conf;
+
+/**
+ * Function for initialization.
+ *
+ * @param conf
+ */
+public void prepare(Map conf) {
+this.conf = conf;
+this.random = new Random(47);
+
+this.threadsNum = 
Utils.getInt(conf.get(Config.NIMBUS_ASSIGNMENTS_SERVICE_THREADS), 10);
+this.queueSize = 
Utils.getInt(conf.get(Config.NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE), 
100);
+
+this.assignmentsQueue = new HashMap<>();
+for (int i = 0; i < threadsNum; i++) {
+this.assignmentsQueue.put(i, new 
LinkedBlockingQueue(queueSize));
+}
+//start the thread pool
+this.service = Executors.newFixedThreadPool(threadsNum);
+this.active = true;
+//start the threads
+for (int i = 0; i < threadsNum; i++) {
+this.service.submit(new DistributeTask(this, i));
+}
+}
+
+@Override
+public void close() throws IOException {
+this.active = false;
+this.service.shutdownNow();
+try {
+this.service.awaitTermination(10l, TimeUnit.SECONDS);
+} catch (InterruptedException e) {
+LOG.error("Failed to close assignments distribute service");
+}
+this.assignmentsQueue = null;
+}
+
+public void addAssignmentsForNode(String node, SupervisorAssignments 

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

2017-09-13 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2319#discussion_r138659723
  
--- Diff: storm-core/src/clj/org/apache/storm/cluster.clj ---
@@ -244,15 +255,19 @@
 
 ;; Watches should be used for optimization. When ZK is reconnecting, 
they're not guaranteed to be called.
 (defnk mk-storm-cluster-state
-  [cluster-state-spec :acls nil :context (ClusterStateContext.)]
-  (let [[solo? cluster-state] (if (instance? ClusterState 
cluster-state-spec)
-[false cluster-state-spec]
-[true (mk-distributed-cluster-state 
cluster-state-spec :auth-conf cluster-state-spec :acls acls :context context)])
+  [conf :cluster-state nil :acls nil :context (ClusterStateContext.) 
:backend nil]
--- End diff --

Could we rename backend to assignments-backend?  backend feels too generic 
for what this actually is.


---


[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

2017-09-13 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2319#discussion_r138659373
  
--- Diff: storm-core/src/clj/org/apache/storm/cluster.clj ---
@@ -244,15 +255,19 @@
 
 ;; Watches should be used for optimization. When ZK is reconnecting, 
they're not guaranteed to be called.
 (defnk mk-storm-cluster-state
-  [cluster-state-spec :acls nil :context (ClusterStateContext.)]
-  (let [[solo? cluster-state] (if (instance? ClusterState 
cluster-state-spec)
-[false cluster-state-spec]
-[true (mk-distributed-cluster-state 
cluster-state-spec :auth-conf cluster-state-spec :acls acls :context context)])
+  [conf :cluster-state nil :acls nil :context (ClusterStateContext.) 
:backend nil]
+  (let [[solo? cluster-state] (if (and (not-nil? cluster-state) (instance? 
ClusterState cluster-state))
+[false cluster-state]
+[true (mk-distributed-cluster-state conf 
:auth-conf conf :acls acls :context context)])
+assignments-backend (if (nil? backend)
+  (doto (InMemoryAssignmentBackend.) (.prepare 
nil nil))
--- End diff --

Why do we call `(.prepare nil nil)` only on the InMemoryAssignmentBackend?  
It feels like we might be making it difficult to extend it in the future.  From 
what I have seen the assignments dir is a subdirectory of storm-local.  Could 
we just make prepare take the conf, and then we can pass in the conf we already 
have.


---


[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

2017-09-13 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2319#discussion_r138662304
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/assignments/AssignmentDistributionService.java
 ---
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.assignments;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.SupervisorAssignments;
+import org.apache.storm.utils.SupervisorClient;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A service for distributing master assignments to supervisors, this 
service makes the assignments notification asynchronous.
+ * We support multiple working threads to distribute assignment, every 
thread has a queue buffer.
+ * Master will shuffle its node request to the queues, if the target 
queue is full, we just discard the request, let the supervisors sync instead.
+ * 
+ * {@code
+ * Working mode
+ *  ++ +-+
+ *  | queue1 |   ==>   | Working thread1 |
+ * ++ shuffle   ++ +-+
+ * | Master |   ==>
+ * ++   ++ +-+
+ *  | queue2 |   ==>   | Working thread2 |
+ *  ++ +-+
+ * }
+ * 
+ */
+public class AssignmentDistributionService implements Closeable {
+private static final Logger LOG = 
LoggerFactory.getLogger(AssignmentDistributionService.class);
+private ExecutorService service;
+
+/**
+ * Flag to indicate if the service is active
+ */
+private volatile boolean active = false;
+
+private Random random;
+/**
+ * Working threads num.
+ */
+private int threadsNum = 0;
+/**
+ * Working thread queue size.
+ */
+private int queueSize = 0;
+
+/**
+ * Assignments request queue.
+ */
+private volatile Map 
assignmentsQueue;
+
+private Map conf;
+
+/**
+ * Function for initialization.
+ *
+ * @param conf
+ */
+public void prepare(Map conf) {
+this.conf = conf;
+this.random = new Random(47);
+
+this.threadsNum = 
Utils.getInt(conf.get(Config.NIMBUS_ASSIGNMENTS_SERVICE_THREADS), 10);
+this.queueSize = 
Utils.getInt(conf.get(Config.NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE), 
100);
+
+this.assignmentsQueue = new HashMap<>();
+for (int i = 0; i < threadsNum; i++) {
+this.assignmentsQueue.put(i, new 
LinkedBlockingQueue(queueSize));
+}
+//start the thread pool
+this.service = Executors.newFixedThreadPool(threadsNum);
+this.active = true;
+//start the threads
+for (int i = 0; i < threadsNum; i++) {
+this.service.submit(new DistributeTask(this, i));
+}
+}
+
+@Override
+public void close() throws IOException {
+this.active = false;
+this.service.shutdownNow();
+try {
+this.service.awaitTermination(10l, TimeUnit.SECONDS);
+} catch (InterruptedException e) {
+LOG.error("Failed to close assignments distribute service");
+}
+this.assignmentsQueue = null;
+}
+
+public void addAssignmentsForNode(String node, SupervisorAssignments 

[GitHub] storm issue #2156: STORM-2549: Fix broken enforcement mechanism for maxUncom...

2017-09-13 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/2156
  
@WolfeeTJ Thanks for trying this out. I think you are right that it's 
because partition reassignment is happening at a bad time. I think we should 
move reassignment to be the first thing in nextTuple instead. Give me a little 
bit to fix this and maybe add a test.

I've been thinking a bit about partition revocation and shuffling, and we 
might want to add some warnings to the manual partition assignment API as well. 
There are a few cases I can think of where shuffling partitions can cause bugs. 
The Trident spout doesn't support partition shuffling because Trident doesn't 
expect partitions to move from task to task, as far as I can tell. When we 
implement at-most-once support for this spout there's also a requirement that 
partitions don't move between tasks, since otherwise it is possible that tuples 
are emitted more than once.


---


[GitHub] storm issue #2156: STORM-2549: Fix broken enforcement mechanism for maxUncom...

2017-09-13 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/2156
  
@WolfeeTJ I think it's fixed now, please take a look at the new commit.


---


[GitHub] storm pull request #2324: [STORM-2730] Add in config options for acker cpu a...

2017-09-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2324


---


Question: ways to handle inactive pull requests

2017-09-13 Thread Jungtaek Lim
Hi devs,

I have seen some old pull requests for bugfix and new feature going to be
stale. Some of us tried to ping to author several times but not respond in
some months. For new feature we may have to wait for authors, but for
bugfix waiting authors means we are aware of the bug but we don't fix the
bug because of credit which doesn't make sense to me if we should wait for
months.

So IMHO at least we may want to handle inactive bugfix pull requests not
too late, Maybe creating new PR addressing same thing without retaining
commits, or taking over PR via retaining commits. If possible it may be
ideal to take over inactive but valuable pull requests with retaining
commits.

What do you think about it? And does some of us know about any issues
including license, authorship, or so if someone takes over inactive pull
request with retaining their credit (commits)?

Thanks,
Jungtaek Lim (HeartSaVioR)


[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

2017-09-13 Thread danny0405
Github user danny0405 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2319#discussion_r138791919
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/assignments/AssignmentDistributionService.java
 ---
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.assignments;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.SupervisorAssignments;
+import org.apache.storm.utils.SupervisorClient;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A service for distributing master assignments to supervisors, this 
service makes the assignments notification asynchronous.
+ * We support multiple working threads to distribute assignment, every 
thread has a queue buffer.
+ * Master will shuffle its node request to the queues, if the target 
queue is full, we just discard the request, let the supervisors sync instead.
+ * 
+ * {@code
+ * Working mode
+ *  ++ +-+
+ *  | queue1 |   ==>   | Working thread1 |
+ * ++ shuffle   ++ +-+
+ * | Master |   ==>
+ * ++   ++ +-+
+ *  | queue2 |   ==>   | Working thread2 |
+ *  ++ +-+
+ * }
+ * 
+ */
+public class AssignmentDistributionService implements Closeable {
+private static final Logger LOG = 
LoggerFactory.getLogger(AssignmentDistributionService.class);
+private ExecutorService service;
+
+/**
+ * Flag to indicate if the service is active
+ */
+private volatile boolean active = false;
+
+private Random random;
+/**
+ * Working threads num.
+ */
+private int threadsNum = 0;
+/**
+ * Working thread queue size.
+ */
+private int queueSize = 0;
+
+/**
+ * Assignments request queue.
+ */
+private volatile Map 
assignmentsQueue;
+
+private Map conf;
+
+/**
+ * Function for initialization.
+ *
+ * @param conf
+ */
+public void prepare(Map conf) {
+this.conf = conf;
+this.random = new Random(47);
+
+this.threadsNum = 
Utils.getInt(conf.get(Config.NIMBUS_ASSIGNMENTS_SERVICE_THREADS), 10);
+this.queueSize = 
Utils.getInt(conf.get(Config.NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE), 
100);
+
+this.assignmentsQueue = new HashMap<>();
+for (int i = 0; i < threadsNum; i++) {
+this.assignmentsQueue.put(i, new 
LinkedBlockingQueue(queueSize));
+}
+//start the thread pool
+this.service = Executors.newFixedThreadPool(threadsNum);
+this.active = true;
+//start the threads
+for (int i = 0; i < threadsNum; i++) {
+this.service.submit(new DistributeTask(this, i));
+}
+}
+
+@Override
+public void close() throws IOException {
+this.active = false;
+this.service.shutdownNow();
+try {
+this.service.awaitTermination(10l, TimeUnit.SECONDS);
+} catch (InterruptedException e) {
+LOG.error("Failed to close assignments distribute service");
+}
+this.assignmentsQueue = null;
+}
+
+public void addAssignmentsForNode(String node, SupervisorAssignments 

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

2017-09-13 Thread danny0405
Github user danny0405 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2319#discussion_r138780747
  
--- Diff: storm-core/src/clj/org/apache/storm/cluster.clj ---
@@ -244,15 +255,19 @@
 
 ;; Watches should be used for optimization. When ZK is reconnecting, 
they're not guaranteed to be called.
 (defnk mk-storm-cluster-state
-  [cluster-state-spec :acls nil :context (ClusterStateContext.)]
-  (let [[solo? cluster-state] (if (instance? ClusterState 
cluster-state-spec)
-[false cluster-state-spec]
-[true (mk-distributed-cluster-state 
cluster-state-spec :auth-conf cluster-state-spec :acls acls :context context)])
+  [conf :cluster-state nil :acls nil :context (ClusterStateContext.) 
:backend nil]
--- End diff --

okey, i will rename it


---


[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

2017-09-13 Thread danny0405
Github user danny0405 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2319#discussion_r138781163
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/assignments/AssignmentDistributionService.java
 ---
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.assignments;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.SupervisorAssignments;
+import org.apache.storm.utils.SupervisorClient;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A service for distributing master assignments to supervisors, this 
service makes the assignments notification asynchronous.
+ * We support multiple working threads to distribute assignment, every 
thread has a queue buffer.
+ * Master will shuffle its node request to the queues, if the target 
queue is full, we just discard the request, let the supervisors sync instead.
+ * 
+ * {@code
+ * Working mode
+ *  ++ +-+
+ *  | queue1 |   ==>   | Working thread1 |
+ * ++ shuffle   ++ +-+
+ * | Master |   ==>
+ * ++   ++ +-+
+ *  | queue2 |   ==>   | Working thread2 |
+ *  ++ +-+
+ * }
+ * 
+ */
+public class AssignmentDistributionService implements Closeable {
+private static final Logger LOG = 
LoggerFactory.getLogger(AssignmentDistributionService.class);
+private ExecutorService service;
+
+/**
+ * Flag to indicate if the service is active
+ */
+private volatile boolean active = false;
+
+private Random random;
+/**
+ * Working threads num.
+ */
+private int threadsNum = 0;
+/**
+ * Working thread queue size.
+ */
+private int queueSize = 0;
+
+/**
+ * Assignments request queue.
+ */
+private volatile Map 
assignmentsQueue;
+
+private Map conf;
+
+/**
+ * Function for initialization.
+ *
+ * @param conf
+ */
+public void prepare(Map conf) {
+this.conf = conf;
+this.random = new Random(47);
+
+this.threadsNum = 
Utils.getInt(conf.get(Config.NIMBUS_ASSIGNMENTS_SERVICE_THREADS), 10);
+this.queueSize = 
Utils.getInt(conf.get(Config.NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE), 
100);
+
+this.assignmentsQueue = new HashMap<>();
+for (int i = 0; i < threadsNum; i++) {
+this.assignmentsQueue.put(i, new 
LinkedBlockingQueue(queueSize));
+}
+//start the thread pool
+this.service = Executors.newFixedThreadPool(threadsNum);
+this.active = true;
+//start the threads
+for (int i = 0; i < threadsNum; i++) {
+this.service.submit(new DistributeTask(this, i));
+}
+}
+
+@Override
+public void close() throws IOException {
+this.active = false;
+this.service.shutdownNow();
+try {
+this.service.awaitTermination(10l, TimeUnit.SECONDS);
+} catch (InterruptedException e) {
+LOG.error("Failed to close assignments distribute service");
+}
+this.assignmentsQueue = null;
+}
+
+public void addAssignmentsForNode(String node, SupervisorAssignments 

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

2017-09-13 Thread danny0405
Github user danny0405 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2319#discussion_r138781061
  
--- Diff: storm-core/src/clj/org/apache/storm/cluster.clj ---
@@ -244,15 +255,19 @@
 
 ;; Watches should be used for optimization. When ZK is reconnecting, 
they're not guaranteed to be called.
 (defnk mk-storm-cluster-state
-  [cluster-state-spec :acls nil :context (ClusterStateContext.)]
-  (let [[solo? cluster-state] (if (instance? ClusterState 
cluster-state-spec)
-[false cluster-state-spec]
-[true (mk-distributed-cluster-state 
cluster-state-spec :auth-conf cluster-state-spec :acls acls :context context)])
+  [conf :cluster-state nil :acls nil :context (ClusterStateContext.) 
:backend nil]
+  (let [[solo? cluster-state] (if (and (not-nil? cluster-state) (instance? 
ClusterState cluster-state))
+[false cluster-state]
+[true (mk-distributed-cluster-state conf 
:auth-conf conf :acls acls :context context)])
+assignments-backend (if (nil? backend)
+  (doto (InMemoryAssignmentBackend.) (.prepare 
nil nil))
--- End diff --

okey, i will make it configurable


---


[GitHub] storm-site issue #1: STORM-2629: Upgrade to latest github-pages to allow Win...

2017-09-13 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm-site/pull/1
  
Which pair(s) of OS and Ruby version are you trying out?
I'm trying out the change but I'm experiencing crash on redcarpet. 
(redcarpet.rb has `require 'redcarpet.so'` but there's no file) macOS Sierra 
(10.12.6) and Ruby 2.4.1. Odd thing is that standalone command 'redcarpet' 
works.


---


[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

2017-09-13 Thread danny0405
Github user danny0405 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2319#discussion_r138780949
  
--- Diff: storm-core/src/clj/org/apache/storm/cluster.clj ---
@@ -123,6 +133,7 @@
 (def SUPERVISORS-SUBTREE (str "/" SUPERVISORS-ROOT))
 (def WORKERBEATS-SUBTREE (str "/" WORKERBEATS-ROOT))
 (def BACKPRESSURE-SUBTREE (str "/" BACKPRESSURE-ROOT))
+(def LEADERINFO-SUBTREE (str "/" LEADERINFO-ROOT))
--- End diff --

Actually, this is not really needed, but if we always get leader from 
nimbus, we will make 2 RPC requests every time we contact with the leader 
[supervisor to nimbus/], instead get it from zookeeper directly is a better way.


---


Re: Bare minimum requirements to run a Storm Worker

2017-09-13 Thread Gayashan Amarasinghe
Hi Bobby,

Thank you very much for the details. I'm not fixed on a specific version
yet, would prefer the latest. Is it possible to run with minimum network
usage, apart from receiving and delivering tuples, such as shutting down
ackers, running with no supervision, pros and cons of that, etc. Do you
have any study or details on that or can you point me to where I should
look at to get some idea?

Thank you.

Best regards,
/Gayashan

On Tue, Sep 12, 2017 at 12:26 AM, Bobby Evans 
wrote:

> A lot of that depends on the version of storm you have, and it is not a use
> case that we have played around with too much.
>
> Just looking at a few of our production clusters the CPU usage is very
> small (but trying to determine what the CPU usage on a raspberry pi arm
> chip when all you have is the usage on a server CPU is hard to do).  The
> memory usage, however can be somewhat high, especially for a raspberry pi.
> We are setting the heap size for the supervisor on our clusters to 256MB,
> but the heap size on the logviewer is 768MB.  But we have not tuned these
> at all, so it could easily be a lot smaller and still work.
>
> - Bobby
>
>
>
> On Sun, Sep 10, 2017 at 4:36 AM Gayashan Amarasinghe <
> gayashan.amarasin...@gmail.com> wrote:
>
> > Hi devs,
> >
> > What are the bare minimum requirements to run a Storm worker on a low
> > resource environment such as a raspberry pi node where bandwidth
> > consumption and power consumption could be critical?
> >
> > Thank you.
> >
> > Best regards,
> > /Gayashan
> >
>


[GitHub] storm issue #2319: [STORM-2693] Nimbus assignments promotion

2017-09-13 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2319
  
Before looking into detail, according to design doc, as I commented to the 
JIRA issue, I don't see a reason to store cache to disk or so, since ZK is 
still a source of truth and whenever process starts it should sync up with ZK 
because cache may be outdated. It looks like keeping cache in memory looks 
sufficient.


---


[GitHub] storm issue #2324: [STORM-2730] Add in config options for acker cpu and memo...

2017-09-13 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2324
  
+1


---


[GitHub] storm pull request #2317: STORM-2675 (1.x): Fix storm-kafka-client Trident s...

2017-09-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2317


---


[GitHub] storm issue #2317: STORM-2675 (1.x): Fix storm-kafka-client Trident spout fa...

2017-09-13 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2317
  
Built in my local and verified it passed with JDK 7. +1 


---


[GitHub] storm issue #2320: STORM-2736: fix o.a.s.b.BlobStoreUtils [ERROR] "Could not...

2017-09-13 Thread hmcc
Github user hmcc commented on the issue:

https://github.com/apache/storm/pull/2320
  
Rebased & squashed.


---


[GitHub] storm issue #2320: STORM-2736: fix o.a.s.b.BlobStoreUtils [ERROR] "Could not...

2017-09-13 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/2320
  
Still +1


---


[GitHub] storm issue #2323: STORM-2736: fix o.a.s.b.BlobStoreUtils [ERROR] "Could not...

2017-09-13 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/2323
  
Still +1


---


[GitHub] storm pull request #2323: STORM-2736: fix o.a.s.b.BlobStoreUtils [ERROR] "Co...

2017-09-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2323


---


[GitHub] storm issue #2319: [STORM-2693] Nimbus assignments promotion

2017-09-13 Thread danny0405
Github user danny0405 commented on the issue:

https://github.com/apache/storm/pull/2319
  
@HeartSaVioR @revans2 
i have change the default backend to memory one, also i change the 
distribute mode to  asynchronous
i chose a rocks-db backend because when the cluster goes to large, 
assignments-info will takes more memory cache[ memory mode ], but if this is 
not a factor, we can just remote it


---


[GitHub] storm issue #2323: STORM-2736: fix o.a.s.b.BlobStoreUtils [ERROR] "Could not...

2017-09-13 Thread hmcc
Github user hmcc commented on the issue:

https://github.com/apache/storm/pull/2323
  
Rebased & squashed.


---


[GitHub] storm pull request #2320: STORM-2736: fix o.a.s.b.BlobStoreUtils [ERROR] "Co...

2017-09-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2320


---