[GitHub] storm pull request: Exhibitor support
Github user lazyval commented on the pull request: https://github.com/apache/storm/pull/432#issuecomment-74532426 For those who wanders what exhibitor is -- [it's a zookeeper supervisor used for monitoring, back up and visualization](https://github.com/Netflix/exhibitor). @atdixon are you sure this properties should be kept in `storm.yaml`? Why should storm be aware of them? Is there a ticket backing this change (as it's not clear what are benefits of such support)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (STORM-503) Short disruptor queue wait time leads to high CPU usage when idle
[ https://issues.apache.org/jira/browse/STORM-503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14323079#comment-14323079 ] Eric commented on STORM-503: it seems that storm still use an old version of disruptor, the project has moved from google to gitub, and the artefactId changed since. I expect that if we use the last version of disruptor, it will benefits of this commit https://github.com/LMAX-Exchange/disruptor/commit/8870a417bff5aed07825fc366b8f470d3561c838#diff-3a4074986e8b9e9f3a9cd82470d357ea Short disruptor queue wait time leads to high CPU usage when idle - Key: STORM-503 URL: https://issues.apache.org/jira/browse/STORM-503 Project: Apache Storm Issue Type: Bug Affects Versions: 0.9.2-incubating, 0.9.1-incubating, 0.9.3 Reporter: Milad Fatenejad Priority: Minor I am fairly new to storm, but I observed some behavior which I believe may be unintended and wanted to report it... I was experimenting with using storm on a topology which had large numbers of threads (30) and was running on a single node for test purposes and noticed that even when no tuples were being processed, there was over 100% CPU utilization. I became concerned and investigated by attempting to reproduce with a very simple topology. I took the WordCountTopology from storm-starter and ran it in an Ubuntu VM. I increased the sleep time in the RandomSentenceSpout that feeds the topology to 10 seconds so that there was effectively no work to do. I then modified the topology so that there were 30 threads for each bolt and only one instance of the spout. When I ran the topology I noticed that there was again 100% CPU usage when idle even on this very simple topology. After extensive experimentation (netty vs. zeromq, 0.9.3, 0.9.2, 0.9.1, multiple JVM versions) I used yourkit and found that the high utilization was coming from DisruptorQueue.consumeBatchWhenAvailable where there is this code: final long availableSequence = _barrier.waitFor(nextSequence, 10, TimeUnit.MILLISECONDS); I increased to 100 ms and was able to reduce the CPU utilization when idle. I am new to storm, so I am not sure what affect modifying this number has. I Is this expected behavior from storm? I would like to propose modifying the code so that this wait is configurable if possible... Thank You -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] storm pull request: Exhibitor support
Github user atdixon commented on the pull request: https://github.com/apache/storm/pull/432#issuecomment-74537246 Zookeeper and its clients require static list of hosts in its configuration [1]. Exhibitor (in addition to supervision, backup, etc.) can provide the Zookeeper server list to Curator clients (via Curator's Exhibitor support [2]), so that the client can dynamically update its Zookeeper servers list. This provides a more robust connection to our Zookeeper ensemble from Storm (supporting zookeeper cluster scale-up/down, whole node replacements, rolling upgrades, outages, etc. all w/o restarting Storm.) It also allows Storm to discover our Zookeeper cluster through curator-exhibitor instead of requiring the full Zookeeper ensemble to be hard-listed into the Storm configuration. With this patch, you can (optionally) provide Exhibitor servers *in place of* Zookeeper servers in the storm config and Storm will discover your Zookeeper ensemble. This is why it should go into storm.yaml. [1] http://zookeeper.apache.org/doc/r3.5.0-alpha/zookeeperProgrammers.html#ch_gotchas [2] http://curator.apache.org/exhibitor.html --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329: fix cascading Storm failure by impr...
Github user miguno commented on a diff in the pull request: https://github.com/apache/storm/pull/429#discussion_r24738502 --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java --- @@ -42,344 +42,577 @@ import org.slf4j.LoggerFactory; import backtype.storm.Config; +import backtype.storm.messaging.ConnectionWithStatus; import backtype.storm.metric.api.IStatefulObject; -import backtype.storm.messaging.IConnection; import backtype.storm.messaging.TaskMessage; import backtype.storm.utils.StormBoundedExponentialBackoffRetry; import backtype.storm.utils.Utils; -public class Client implements IConnection, IStatefulObject{ +/** + * A Netty client for sending task messages to a remote destination (Netty server). + * + * Implementation details: + * + * - Sending messages, i.e. writing to the channel, is performed asynchronously. + * - Messages are sent in batches to optimize for network throughput at the expense of network latency. The message + * batch size is configurable. + * - Connecting and reconnecting are performed asynchronously. + * - Note: The current implementation drops any messages that are being enqueued for sending if the connection to + * the remote destination is currently unavailable. + * - A background flusher thread is run in the background. It will, at fixed intervals, check for any pending messages + * (i.e. messages buffered in memory) and flush them to the remote destination iff background flushing is currently + * enabled. + */ +public class Client extends ConnectionWithStatus implements IStatefulObject { + private static final Logger LOG = LoggerFactory.getLogger(Client.class); private static final String PREFIX = Netty-Client-; -private final int max_retries; -private final int base_sleep_ms; -private final int max_sleep_ms; +private static final long NO_DELAY_MS = 0L; +private static final long MINIMUM_INITIAL_DELAY_MS = 3L; +private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 60L; +private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L; +private static final long DISTANT_FUTURE_TIME_MS = Long.MAX_VALUE; + private final StormBoundedExponentialBackoffRetry retryPolicy; -private AtomicReferenceChannel channelRef; private final ClientBootstrap bootstrap; -private InetSocketAddress remote_addr; - -private AtomicInteger totalReconnects; -private AtomicInteger messagesSent; -private AtomicInteger messagesLostReconnect; -private final Random random = new Random(); -private final ChannelFactory factory; -private final int buffer_size; -private boolean closing; - -private int messageBatchSize; - -private AtomicLong pendings; - -Map storm_conf; +private final InetSocketAddress dstAddress; +protected final String dstAddressPrefixedName; + +/** + * The channel used for all write operations from this client to the remote destination. + */ +private final AtomicReferenceChannel channelRef = new AtomicReferenceChannel(null); + + +/** + * Maximum number of reconnection attempts we will perform after a disconnect before giving up. + */ +private final int maxReconnectionAttempts; + +/** + * Total number of connection attempts. + */ +private final AtomicInteger totalConnectionAttempts = new AtomicInteger(0); + +/** + * Number of connection attempts since the last disconnect. + */ +private final AtomicInteger connectionAttempts = new AtomicInteger(0); + +/** + * Number of messages successfully sent to the remote destination. + */ +private final AtomicInteger messagesSent = new AtomicInteger(0); + +/** + * Number of messages that could not be sent to the remote destination. + */ +private final AtomicInteger messagesLost = new AtomicInteger(0); + +/** + * Number of messages buffered in memory. + */ +private final AtomicLong pendingMessages = new AtomicLong(0); + +/** + * This flag is set to true if and only if a client instance is being closed. + */ +private volatile boolean closing = false; + +/** + * When set to true, then the background flusher thread will flush any pending messages on its next run. + */ +private final AtomicBoolean backgroundFlushingEnabled = new AtomicBoolean(false); + +/** + * The absolute time (in ms) when the next background flush should be performed. + * + * Note: The
[jira] [Commented] (STORM-329) Add Option to Config Message handling strategy when connection timeout
[ https://issues.apache.org/jira/browse/STORM-329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14322547#comment-14322547 ] ASF GitHub Bot commented on STORM-329: -- Github user miguno commented on a diff in the pull request: https://github.com/apache/storm/pull/429#discussion_r24738502 --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java --- @@ -42,344 +42,577 @@ import org.slf4j.LoggerFactory; import backtype.storm.Config; +import backtype.storm.messaging.ConnectionWithStatus; import backtype.storm.metric.api.IStatefulObject; -import backtype.storm.messaging.IConnection; import backtype.storm.messaging.TaskMessage; import backtype.storm.utils.StormBoundedExponentialBackoffRetry; import backtype.storm.utils.Utils; -public class Client implements IConnection, IStatefulObject{ +/** + * A Netty client for sending task messages to a remote destination (Netty server). + * + * Implementation details: + * + * - Sending messages, i.e. writing to the channel, is performed asynchronously. + * - Messages are sent in batches to optimize for network throughput at the expense of network latency. The message + * batch size is configurable. + * - Connecting and reconnecting are performed asynchronously. + * - Note: The current implementation drops any messages that are being enqueued for sending if the connection to + * the remote destination is currently unavailable. + * - A background flusher thread is run in the background. It will, at fixed intervals, check for any pending messages + * (i.e. messages buffered in memory) and flush them to the remote destination iff background flushing is currently + * enabled. + */ +public class Client extends ConnectionWithStatus implements IStatefulObject { + private static final Logger LOG = LoggerFactory.getLogger(Client.class); private static final String PREFIX = Netty-Client-; -private final int max_retries; -private final int base_sleep_ms; -private final int max_sleep_ms; +private static final long NO_DELAY_MS = 0L; +private static final long MINIMUM_INITIAL_DELAY_MS = 3L; +private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 60L; +private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L; +private static final long DISTANT_FUTURE_TIME_MS = Long.MAX_VALUE; + private final StormBoundedExponentialBackoffRetry retryPolicy; -private AtomicReferenceChannel channelRef; private final ClientBootstrap bootstrap; -private InetSocketAddress remote_addr; - -private AtomicInteger totalReconnects; -private AtomicInteger messagesSent; -private AtomicInteger messagesLostReconnect; -private final Random random = new Random(); -private final ChannelFactory factory; -private final int buffer_size; -private boolean closing; - -private int messageBatchSize; - -private AtomicLong pendings; - -Map storm_conf; +private final InetSocketAddress dstAddress; +protected final String dstAddressPrefixedName; + +/** + * The channel used for all write operations from this client to the remote destination. + */ +private final AtomicReferenceChannel channelRef = new AtomicReferenceChannel(null); + + +/** + * Maximum number of reconnection attempts we will perform after a disconnect before giving up. + */ +private final int maxReconnectionAttempts; + +/** + * Total number of connection attempts. + */ +private final AtomicInteger totalConnectionAttempts = new AtomicInteger(0); + +/** + * Number of connection attempts since the last disconnect. + */ +private final AtomicInteger connectionAttempts = new AtomicInteger(0); + +/** + * Number of messages successfully sent to the remote destination. + */ +private final AtomicInteger messagesSent = new AtomicInteger(0); + +/** + * Number of messages that could not be sent to the remote destination. + */ +private final AtomicInteger messagesLost = new AtomicInteger(0); + +/** + * Number of messages buffered in memory. + */ +private final AtomicLong pendingMessages = new AtomicLong(0); + +/** + * This flag is set to true if and only if a client instance is being closed. + */ +private volatile boolean closing = false; + +/** + * When set to true, then the background flusher thread will flush any pending messages on its next run.