[GitHub] storm pull request: Exhibitor support

2015-02-16 Thread lazyval
Github user lazyval commented on the pull request:

https://github.com/apache/storm/pull/432#issuecomment-74532426
  
For those who wanders what exhibitor is -- [it's a zookeeper supervisor 
used for monitoring, back up and 
visualization](https://github.com/Netflix/exhibitor). 

@atdixon are you sure this properties should be kept in `storm.yaml`? Why 
should storm be aware of them? Is there a ticket backing this change (as it's 
not clear what are benefits of such support)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-503) Short disruptor queue wait time leads to high CPU usage when idle

2015-02-16 Thread Eric (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14323079#comment-14323079
 ] 

Eric commented on STORM-503:


it seems that storm still use an old version of disruptor, the project has 
moved from google to gitub, and the artefactId changed since.

I expect that if we use the last version of disruptor, it will benefits of this 
commit
https://github.com/LMAX-Exchange/disruptor/commit/8870a417bff5aed07825fc366b8f470d3561c838#diff-3a4074986e8b9e9f3a9cd82470d357ea

 Short disruptor queue wait time leads to high CPU usage when idle
 -

 Key: STORM-503
 URL: https://issues.apache.org/jira/browse/STORM-503
 Project: Apache Storm
  Issue Type: Bug
Affects Versions: 0.9.2-incubating, 0.9.1-incubating, 0.9.3
Reporter: Milad Fatenejad
Priority: Minor

 I am fairly new to storm, but I observed some behavior which I believe may be 
 unintended and wanted to report it...
 I was experimenting with using storm on a topology which had large numbers of 
 threads (30) and was running on a single node for test purposes and noticed 
 that even when no tuples were being processed, there was over 100% CPU 
 utilization.
 I became concerned and investigated by attempting to reproduce with a very 
 simple topology. I took the WordCountTopology from storm-starter and ran it 
 in  an Ubuntu VM. I increased the sleep time in the RandomSentenceSpout that 
 feeds the topology to 10 seconds so that there was effectively no work to do. 
 I then modified the topology so that there were 30 threads for each bolt and 
 only one instance of the spout. When I ran the topology I noticed that there 
 was again 100% CPU usage when idle even on this very simple topology. After 
 extensive experimentation (netty vs. zeromq, 0.9.3, 0.9.2, 0.9.1, multiple 
 JVM versions) I used yourkit and found that the high utilization was coming 
 from DisruptorQueue.consumeBatchWhenAvailable where there is this code:
 final long availableSequence = _barrier.waitFor(nextSequence, 10, 
 TimeUnit.MILLISECONDS);
 I increased to 100 ms and was able to reduce the CPU utilization when idle. I 
 am new to storm, so I am not sure what affect modifying this number has. I Is 
 this expected behavior from storm? I would like to propose modifying the code 
 so that this wait is configurable if possible...
 Thank You



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: Exhibitor support

2015-02-16 Thread atdixon
Github user atdixon commented on the pull request:

https://github.com/apache/storm/pull/432#issuecomment-74537246
  
Zookeeper and its clients require static list of hosts in its configuration 
[1]. Exhibitor (in addition to supervision, backup, etc.) can provide the 
Zookeeper server list to Curator clients (via Curator's Exhibitor support [2]), 
so that the client can dynamically update its Zookeeper servers list.

This provides a more robust connection to our Zookeeper ensemble from Storm 
(supporting zookeeper cluster scale-up/down, whole node replacements, rolling 
upgrades, outages, etc. all w/o restarting Storm.) It also allows Storm to 
discover our Zookeeper cluster through curator-exhibitor instead of requiring 
the full Zookeeper ensemble to be hard-listed into the Storm configuration.

With this patch, you can (optionally) provide Exhibitor servers *in place 
of* Zookeeper servers in the storm config and Storm will discover your 
Zookeeper ensemble. This is why it should go into storm.yaml.

[1] 
http://zookeeper.apache.org/doc/r3.5.0-alpha/zookeeperProgrammers.html#ch_gotchas
[2] http://curator.apache.org/exhibitor.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-329: fix cascading Storm failure by impr...

2015-02-16 Thread miguno
Github user miguno commented on a diff in the pull request:

https://github.com/apache/storm/pull/429#discussion_r24738502
  
--- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
@@ -42,344 +42,577 @@
 import org.slf4j.LoggerFactory;
 
 import backtype.storm.Config;
+import backtype.storm.messaging.ConnectionWithStatus;
 import backtype.storm.metric.api.IStatefulObject;
-import backtype.storm.messaging.IConnection;
 import backtype.storm.messaging.TaskMessage;
 import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
 import backtype.storm.utils.Utils;
 
-public class Client implements IConnection, IStatefulObject{
+/**
+ * A Netty client for sending task messages to a remote destination (Netty 
server).
+ *
+ * Implementation details:
+ *
+ * - Sending messages, i.e. writing to the channel, is performed 
asynchronously.
+ * - Messages are sent in batches to optimize for network throughput at 
the expense of network latency.  The message
+ *   batch size is configurable.
+ * - Connecting and reconnecting are performed asynchronously.
+ * - Note: The current implementation drops any messages that are 
being enqueued for sending if the connection to
+ *   the remote destination is currently unavailable.
+ * - A background flusher thread is run in the background.  It will, at 
fixed intervals, check for any pending messages
+ *   (i.e. messages buffered in memory) and flush them to the remote 
destination iff background flushing is currently
+ *   enabled.
+ */
+public class Client extends ConnectionWithStatus implements 
IStatefulObject {
+
 private static final Logger LOG = 
LoggerFactory.getLogger(Client.class);
 private static final String PREFIX = Netty-Client-;
-private final int max_retries;
-private final int base_sleep_ms;
-private final int max_sleep_ms;
+private static final long NO_DELAY_MS = 0L;
+private static final long MINIMUM_INITIAL_DELAY_MS = 3L;
+private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 60L;
+private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
+private static final long DISTANT_FUTURE_TIME_MS = Long.MAX_VALUE;
+
 private final StormBoundedExponentialBackoffRetry retryPolicy;
-private AtomicReferenceChannel channelRef;
 private final ClientBootstrap bootstrap;
-private InetSocketAddress remote_addr;
-
-private AtomicInteger totalReconnects;
-private AtomicInteger messagesSent;
-private AtomicInteger messagesLostReconnect;
-private final Random random = new Random();
-private final ChannelFactory factory;
-private final int buffer_size;
-private boolean closing;
-
-private int messageBatchSize;
-
-private AtomicLong pendings;
-
-Map storm_conf;
+private final InetSocketAddress dstAddress;
+protected final String dstAddressPrefixedName;
+
+/**
+ * The channel used for all write operations from this client to the 
remote destination.
+ */
+private final AtomicReferenceChannel channelRef = new 
AtomicReferenceChannel(null);
+
+
+/**
+ * Maximum number of reconnection attempts we will perform after a 
disconnect before giving up.
+ */
+private final int maxReconnectionAttempts;
+
+/**
+ * Total number of connection attempts.
+ */
+private final AtomicInteger totalConnectionAttempts = new 
AtomicInteger(0);
+
+/**
+ * Number of connection attempts since the last disconnect.
+ */
+private final AtomicInteger connectionAttempts = new AtomicInteger(0);
+
+/**
+ * Number of messages successfully sent to the remote destination.
+ */
+private final AtomicInteger messagesSent = new AtomicInteger(0);
+
+/**
+ * Number of messages that could not be sent to the remote destination.
+ */
+private final AtomicInteger messagesLost = new AtomicInteger(0);
+
+/**
+ * Number of messages buffered in memory.
+ */
+private final AtomicLong pendingMessages = new AtomicLong(0);
+
+/**
+ * This flag is set to true if and only if a client instance is being 
closed.
+ */
+private volatile boolean closing = false;
+
+/**
+ * When set to true, then the background flusher thread will flush any 
pending messages on its next run.
+ */
+private final AtomicBoolean backgroundFlushingEnabled = new 
AtomicBoolean(false);
+
+/**
+ * The absolute time (in ms) when the next background flush should be 
performed.
+ *
+ * Note: The 

[jira] [Commented] (STORM-329) Add Option to Config Message handling strategy when connection timeout

2015-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14322547#comment-14322547
 ] 

ASF GitHub Bot commented on STORM-329:
--

Github user miguno commented on a diff in the pull request:

https://github.com/apache/storm/pull/429#discussion_r24738502
  
--- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
@@ -42,344 +42,577 @@
 import org.slf4j.LoggerFactory;
 
 import backtype.storm.Config;
+import backtype.storm.messaging.ConnectionWithStatus;
 import backtype.storm.metric.api.IStatefulObject;
-import backtype.storm.messaging.IConnection;
 import backtype.storm.messaging.TaskMessage;
 import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
 import backtype.storm.utils.Utils;
 
-public class Client implements IConnection, IStatefulObject{
+/**
+ * A Netty client for sending task messages to a remote destination (Netty 
server).
+ *
+ * Implementation details:
+ *
+ * - Sending messages, i.e. writing to the channel, is performed 
asynchronously.
+ * - Messages are sent in batches to optimize for network throughput at 
the expense of network latency.  The message
+ *   batch size is configurable.
+ * - Connecting and reconnecting are performed asynchronously.
+ * - Note: The current implementation drops any messages that are 
being enqueued for sending if the connection to
+ *   the remote destination is currently unavailable.
+ * - A background flusher thread is run in the background.  It will, at 
fixed intervals, check for any pending messages
+ *   (i.e. messages buffered in memory) and flush them to the remote 
destination iff background flushing is currently
+ *   enabled.
+ */
+public class Client extends ConnectionWithStatus implements 
IStatefulObject {
+
 private static final Logger LOG = 
LoggerFactory.getLogger(Client.class);
 private static final String PREFIX = Netty-Client-;
-private final int max_retries;
-private final int base_sleep_ms;
-private final int max_sleep_ms;
+private static final long NO_DELAY_MS = 0L;
+private static final long MINIMUM_INITIAL_DELAY_MS = 3L;
+private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 60L;
+private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
+private static final long DISTANT_FUTURE_TIME_MS = Long.MAX_VALUE;
+
 private final StormBoundedExponentialBackoffRetry retryPolicy;
-private AtomicReferenceChannel channelRef;
 private final ClientBootstrap bootstrap;
-private InetSocketAddress remote_addr;
-
-private AtomicInteger totalReconnects;
-private AtomicInteger messagesSent;
-private AtomicInteger messagesLostReconnect;
-private final Random random = new Random();
-private final ChannelFactory factory;
-private final int buffer_size;
-private boolean closing;
-
-private int messageBatchSize;
-
-private AtomicLong pendings;
-
-Map storm_conf;
+private final InetSocketAddress dstAddress;
+protected final String dstAddressPrefixedName;
+
+/**
+ * The channel used for all write operations from this client to the 
remote destination.
+ */
+private final AtomicReferenceChannel channelRef = new 
AtomicReferenceChannel(null);
+
+
+/**
+ * Maximum number of reconnection attempts we will perform after a 
disconnect before giving up.
+ */
+private final int maxReconnectionAttempts;
+
+/**
+ * Total number of connection attempts.
+ */
+private final AtomicInteger totalConnectionAttempts = new 
AtomicInteger(0);
+
+/**
+ * Number of connection attempts since the last disconnect.
+ */
+private final AtomicInteger connectionAttempts = new AtomicInteger(0);
+
+/**
+ * Number of messages successfully sent to the remote destination.
+ */
+private final AtomicInteger messagesSent = new AtomicInteger(0);
+
+/**
+ * Number of messages that could not be sent to the remote destination.
+ */
+private final AtomicInteger messagesLost = new AtomicInteger(0);
+
+/**
+ * Number of messages buffered in memory.
+ */
+private final AtomicLong pendingMessages = new AtomicLong(0);
+
+/**
+ * This flag is set to true if and only if a client instance is being 
closed.
+ */
+private volatile boolean closing = false;
+
+/**
+ * When set to true, then the background flusher thread will flush any 
pending messages on its next run.