Re: Spout sleep wait strategy

2017-08-15 Thread Brian Taylor
Unsubscribe

⁣Sent from BlueMail ​

On Aug 15, 2017, 10:34 AM, at 10:34 AM, Mahak Goel  
wrote:
>
>Hi,
>
>I know I can configure a sleep wait strategy in the defaults.yaml and
>that will apply to all spouts in the topology. Is there a way to do
>this on a spout by spout basis? That is, is there a way to configure
>different times for different spouts?
>
>Thanks!


Re: 回复: RE: How to Improve Storm Application's Throughput

2017-08-09 Thread Brian Taylor
Unsubscribe

⁣Sent from BlueMail ​

On Aug 9, 2017, 8:40 AM, at 8:40 AM, "Hannum, Daniel" 
 wrote:
>I think the problem is that capacity of 3.5. That indicates that
>there’s a backlog on that bolt, so it’s saying that actual time spent
>processing in the bolt is small, but the total time spent (including
>wait time) is large. Scale the bolts up or scale the spout down or make
>the bolt faster
>
>From: "fanxi...@travelsky.com" 
>Reply-To: "user@storm.apache.org" 
>Date: Tuesday, August 8, 2017 at 11:02 PM
>To: user , libo19 , kabhwan
>
>Subject: 回复: RE: How to Improve Storm Application's Throughput
>
>This email did not originate from the Premier, Inc. network. Use
>caution when opening attachments or clicking on URLs.*
>
>
>.
>Hi LiBo, Jungtaek :
>
>Yes, storm tunning depends on situations. Thank u for your kindly
>advice.
>The follow is one of my situations, any hints from you will be
>appreciated. The storm version is 1.0.0.
>The topology just has a spout which reads message from kafka and a bolt
>to parse the message and put it into the hbase.
>[cid:image001.png@01D310E7.19BFB400]
>As you can see from the above picture, the Execute latency of the bolt
>is small(0.5ms), but the Complete latency is much more larger(4365ms),
>so as to slow down the throughput of the topology.
>Which part will consume so much additional time? the transfer between
>the spout and the bolt ? or the ack part? I tried to increase
>parallelism for the component, but it did not work.
>Is there a tool to analyze the time consumption in general? It will be
>a great news to know it.
>
>There is another thing to explain in the above picture. It seems that
>the Capacity is high as 1.617, but there are 64 bolts, most Capacity of
>it is low, as picture below shows.
>[cid:image002.png@01D310E7.19BFB400]
>[cid:image003.png@01D310E7.19BFB400]
>So, another puzzle is both the Execute latency and the Executed is
>about equal, but the Capacity turns out to be so much different.  Any
>hints?
>
>The follow is another topology.
>[cid:image004.png@01D310E7.19BFB400]
>Maybe the history_Put bolt has both high Capacity and larger Execute
>latency, this would definitely lead to the Complete latency as 56964ms?
>
>Thank you all for your time.
>
>
>
>Joshua
>
>
>
>发件人: 李波
>发送时间: 2017-08-07 16:56
>收件人: user@storm.apache.org
>抄送:
>jiangyc_cui...@si-tech.com.cn;
>'zhangxl_bds'
>主题: RE: How to Improve Storm Application's Throughput
>你好!
>
>Storm的性能排查过程需要不断的尝试最后达到一个经验值,我个人的排查过程如下,希望可以有一些帮助:
>
>1、Kafka PartitionNumber and  KafkaSpout’s parallelism
>首先你要确认是KafkaSpout的接收能力不行导致的延迟,还是由于后续的bolt处理能力有限造成拥堵,导致的上游KafkaSpout也一起拥堵造成的延迟
>KafkaSpout接收能力不行的话,需要增加Kakfa的分区数量,同时把KafkaSpout的并行度设置为和分区数量一致,这样逐步提升以达到吞吐要求
>
>2、Bolt’s business logical and Bolt’s parallelism
>首先看下是不是硬件资源不行了。。
>其次,需要查出拥堵在哪一个bolt造成的拥堵,可以配合Storm的那个动态图来判断越红拥堵越厉害,同时参考Capacity这个值来判断超过1的Bolt或多或少会出现拥堵的情况
>根据自己的算法来优化处理逻辑提升效率,或者通过增加Bolt并行度的方式来提升Bolt处理能力(前提是硬件资源没有到上限)
>
>从你的这个业务来看应该是后续的几个Bolt要访问外部的数据存储系统进行最终结果的存储,关注一下存取数据是否延迟较大,目标系统压力比较大
>
>多数情况都是由于Bolt处理能力不够造成的,需要找出压力点优化业务处理逻辑和同时调整Bolt并行度
>关注下bolt的逻辑是计算密集型还是以来外部io,计算密集型只能增加worker
>
>3、Config.TOPOLOGY_BACKPRESSURE_ENABLE
>是否开启了反压机制Config.TOPOLOGY_BACKPRESSURE_ENABLE (好像是storm1.0.0以后才有的)
>
>4、Netty
>另外要想提升Storm在底层传输上的吞吐量,可以通过修改storm.yaml的netty配置,来提升netty的发送批量大
>
>5、Executor‘s throughput params
>// Net io set
>config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 1024 * 16); // default
>is 1024
>config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 1024 * 16);//
>batched; default is 1024
>config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 1024 * 16); //
>individual tuples; default is 1024
>config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 200);
>
>
>
>李波 13813887096 lib...@asiainfo.com
>北京亚信智慧数据科技有限公司
>亚信是我家 发展靠大家
>
>From: 王 纯超 [mailto:wangchunc...@outlook.com]
>Sent: 2017年8月7日 10:58
>To: user 
>Cc: 姜艳春jiangyc_cui...@si-tech.com.cn ;
>zhangxl_bds 
>Subject: How to Improve Storm Application's Throughput
>
>Hi,
>
>I am now considering improve a Storm application's throughput because I
>find that the consumption speed of KafkaSpout is slower than the
>producing speed. And the lag gets larger and larger. Below is the bolt
>statistics. I tried to bring forward the tuple projection and filtering
>logic in a custom scheme with intention of reducing network traffic.
>However, after observation, things go contrary to my wishes. Am I going
>the wrong way? Are there any principles tuning Storm applications? Or
>could anyone give some suggestions for this specific case?
>[cid:image005.jpg@01D310E7.19BFB400]

Re: Is there a reason storm has a default of 4 ports?

2016-07-20 Thread Brian Taylor
It's not recommended to run more than one topology on the same server
cluster as it can make it difficult to monitor and optimize. I usually only
use one slot (JVM) per node. I remember reading somewhere where that is
recommended too.

On Wed, Jul 20, 2016 at 7:22 AM, Navin Ipe <navin@searchlighthealth.com>
wrote:

> From what I know, you can figure out how much memory your application
> needs and allocate the memory as shown below. The below code allocates 2GiB
> of memory for each worker of the topology.
>
> Config stormConfig = new Config();
> stormConfig.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx2g");
>
> On Tue, Jul 19, 2016 at 10:20 PM, Joaquin Menchaca <jmench...@gobalto.com>
> wrote:
>
>> How can one calculate how much memory is needed?
>>
>> On Tue, Jul 19, 2016 at 1:05 AM, Navin Ipe <
>> navin@searchlighthealth.com> wrote:
>>
>>> I've figured out the answer to this. A slot is used by a worker. A
>>> worker is a JVM. So each JVM would require a clump of heap memory of its
>>> own. So a default of 4 slots would use 4*x amount of memory, where x is the
>>> memory used by a worker JVM.
>>> Now obviously if you declare more than 4 ports, it'll take up that much
>>> more memory.
>>> The problem with taking up too much memory, is that your topologies will
>>> suddenly crash with a GC overhead limit exceeded exception and the spout or
>>> bolt will get re-started constantly.
>>> As I understand, you'd be better off with increasing the number of
>>> servers or RAM on the existing server, if you want to have many
>>> workers/topologies.
>>>
>>>
>>> On Mon, Jul 18, 2016 at 3:43 PM, Navin Ipe <
>>> navin@searchlighthealth.com> wrote:
>>>
>>>> Ok, if there's an answer to the first question, then anyone who knows
>>>> details about Storm's design, please help in this thread. For the second
>>>> question, I'll be starting a separate thread, since there would be people
>>>> who'd have experience with running multiple topologies.
>>>>
>>>> On Mon, Jul 18, 2016 at 12:12 PM, Navin Ipe <
>>>> navin@searchlighthealth.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I was initially surprised that Storm couldn't run more than 4
>>>>> topologies by default. On increasing the number of supervisor slots, I was
>>>>> able to run more topologies. But there are two things I don't understand:
>>>>>
>>>>> 1. Was Storm designed to support only 4 default slots because it has
>>>>> to allocate memory for each slot and supporting 10 slots by default would
>>>>> have eaten up too much memory?
>>>>> 2. Since I have to go to each supervisor and alter each storm.yaml
>>>>> file to support more than 4 slots, then if I assign 5 slots to 5
>>>>> supervisors, will I be able to run 5*5=25 topologies? (I've tried it only
>>>>> on my local system until now).
>>>>>
>>>>> --
>>>>> Regards,
>>>>> Navin
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Regards,
>>>> Navin
>>>>
>>>
>>>
>>>
>>> --
>>> Regards,
>>> Navin
>>>
>>
>>
>>
>> --
>>
>> 是故勝兵先勝而後求戰,敗兵先戰而後求勝。
>>
>
>
>
> --
> Regards,
> Navin
>



-- 

Brian Taylor

Resolving Architecture .:.

330-812-7098

br...@resolvingarchitecture.com

http://resolvingarchitecture.com

www.linkedin.com/in/leanenterprisearchitect/
<http://www.linkedin.com/in/javadevops/>


Re: Connecting Storm Output to Node.js app

2016-02-24 Thread Brian Taylor
the map in the message manager constructor is from Storm on prepare

On Wed, Feb 24, 2016 at 6:34 PM, Brian Taylor <
br...@resolvingarchitecture.com> wrote:

> and partitioner:
>
> package com.company.rt.producers.utilities.partitioners;
>
> import java.util.concurrent.atomic.AtomicInteger;
>
> import kafka.producer.Partitioner;
> import kafka.utils.VerifiableProperties;
>
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> /**
>  *
>  */
> public class KafkaRoundRobinPartitioner implements Partitioner {
>
>private static final Logger LOG = 
> LoggerFactory.getLogger(KafkaRoundRobinPartitioner.class);
>
>final AtomicInteger counter = new AtomicInteger(0);
>
>public KafkaRoundRobinPartitioner(VerifiableProperties props) {
>   LOG.info("Instantiated the Storm Round Robin Partitioner class with 
> properties: \n");
>   for(String propName : props.props().stringPropertyNames()) {
>  LOG.info("{}:{} \n", propName, props.props().getProperty(propName));
>   }
>}
>
>@Override
>public int partition(Object key, int numberOfPartitions) {
>   int partitionId = counter.incrementAndGet() % numberOfPartitions;
>   if (counter.get() > Integer.MAX_VALUE) {
>  counter.set(0);
>   }
>   return partitionId;
>}
> }
>
>
> On Wed, Feb 24, 2016 at 6:33 PM, Brian Taylor <
> br...@resolvingarchitecture.com> wrote:
>
>> Here's an example Kafka client:
>>
>> package com.company.rt.services.data;
>>
>> import java.util.ArrayList;
>> import java.util.List;
>> import java.util.Map;
>> import java.util.Properties;
>>
>> import com.company.models.Event;
>> import kafka.javaapi.producer.Producer;
>> import kafka.producer.KeyedMessage;
>> import kafka.producer.ProducerConfig;
>>
>> /**
>>  *
>>  */
>> public final class MessageManager {
>>
>> private Properties relaxedProducerProps;
>> private ProducerConfig relaxedProducerConfig;
>> private Producer<String,String> relaxedProducer;
>>
>> private Properties durableProducerProps;
>> private ProducerConfig durableProducerConfig;
>> private Producer<String,String> durableProducer;
>>
>> public MessageManager(Map map) {
>> relaxedProducerProps = new Properties();
>> relaxedProducerProps.put("metadata.broker.list", 
>> map.get("topology.kafka.broker.list"));
>> relaxedProducerProps.put("serializer.class", 
>> "kafka.serializer.StringEncoder");
>> relaxedProducerProps.put("key.serializer.class", 
>> "kafka.serializer.StringEncoder");
>> relaxedProducerProps.put("request.required.acks", "1");
>> relaxedProducerProps.put("partitioner.class", 
>> "com.company.rt.producers.utilities.partitioners.KafkaRoundRobinPartitioner");
>> relaxedProducerConfig = new ProducerConfig(relaxedProducerProps);
>> relaxedProducer = new Producer<>(relaxedProducerConfig);
>>
>> durableProducerProps = new Properties();
>> durableProducerProps.put("metadata.broker.list", 
>> map.get("topology.kafka.broker.list"));
>> durableProducerProps.put("serializer.class", 
>> "kafka.serializer.StringEncoder");
>> durableProducerProps.put("key.serializer.class", 
>> "kafka.serializer.StringEncoder");
>> durableProducerProps.put("request.required.acks", "-1");
>> durableProducerProps.put("partitioner.class", 
>> "com.company.rt.producers.utilities.partitioners.KafkaRoundRobinPartitioner");
>> durableProducerConfig = new ProducerConfig(durableProducerProps);
>> durableProducer = new Producer<>(durableProducerConfig);
>> }
>>
>> public void send(String topic, String message, boolean durable) throws 
>> Exception {
>> KeyedMessage<String,String> keyedMessage = new 
>> KeyedMessage<>(topic,"key",message);
>> if (durable) durableProducer.send(keyedMessage);
>> else relaxedProducer.send(keyedMessage);
>> }
>>
>> public void send(String topic, List messages, boolean durable) 
>> throws Exception {
>> List<KeyedMessage<String, String>> keyedMessages = new ArrayList<>();
>> for(String message : messages) {
>> ke

Re: Connecting Storm Output to Node.js app

2016-02-24 Thread Brian Taylor
and partitioner:

package com.company.rt.producers.utilities.partitioners;

import java.util.concurrent.atomic.AtomicInteger;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 *
 */
public class KafkaRoundRobinPartitioner implements Partitioner {

   private static final Logger LOG =
LoggerFactory.getLogger(KafkaRoundRobinPartitioner.class);

   final AtomicInteger counter = new AtomicInteger(0);

   public KafkaRoundRobinPartitioner(VerifiableProperties props) {
  LOG.info("Instantiated the Storm Round Robin Partitioner class
with properties: \n");
  for(String propName : props.props().stringPropertyNames()) {
 LOG.info("{}:{} \n", propName, props.props().getProperty(propName));
  }
   }

   @Override
   public int partition(Object key, int numberOfPartitions) {
  int partitionId = counter.incrementAndGet() % numberOfPartitions;
  if (counter.get() > Integer.MAX_VALUE) {
 counter.set(0);
  }
  return partitionId;
   }
}


On Wed, Feb 24, 2016 at 6:33 PM, Brian Taylor <
br...@resolvingarchitecture.com> wrote:

> Here's an example Kafka client:
>
> package com.company.rt.services.data;
>
> import java.util.ArrayList;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
>
> import com.company.models.Event;
> import kafka.javaapi.producer.Producer;
> import kafka.producer.KeyedMessage;
> import kafka.producer.ProducerConfig;
>
> /**
>  *
>  */
> public final class MessageManager {
>
> private Properties relaxedProducerProps;
> private ProducerConfig relaxedProducerConfig;
> private Producer<String,String> relaxedProducer;
>
> private Properties durableProducerProps;
> private ProducerConfig durableProducerConfig;
> private Producer<String,String> durableProducer;
>
> public MessageManager(Map map) {
> relaxedProducerProps = new Properties();
> relaxedProducerProps.put("metadata.broker.list", 
> map.get("topology.kafka.broker.list"));
> relaxedProducerProps.put("serializer.class", 
> "kafka.serializer.StringEncoder");
> relaxedProducerProps.put("key.serializer.class", 
> "kafka.serializer.StringEncoder");
> relaxedProducerProps.put("request.required.acks", "1");
> relaxedProducerProps.put("partitioner.class", 
> "com.company.rt.producers.utilities.partitioners.KafkaRoundRobinPartitioner");
> relaxedProducerConfig = new ProducerConfig(relaxedProducerProps);
> relaxedProducer = new Producer<>(relaxedProducerConfig);
>
> durableProducerProps = new Properties();
> durableProducerProps.put("metadata.broker.list", 
> map.get("topology.kafka.broker.list"));
> durableProducerProps.put("serializer.class", 
> "kafka.serializer.StringEncoder");
> durableProducerProps.put("key.serializer.class", 
> "kafka.serializer.StringEncoder");
> durableProducerProps.put("request.required.acks", "-1");
> durableProducerProps.put("partitioner.class", 
> "com.company.rt.producers.utilities.partitioners.KafkaRoundRobinPartitioner");
> durableProducerConfig = new ProducerConfig(durableProducerProps);
> durableProducer = new Producer<>(durableProducerConfig);
> }
>
> public void send(String topic, String message, boolean durable) throws 
> Exception {
> KeyedMessage<String,String> keyedMessage = new 
> KeyedMessage<>(topic,"key",message);
> if (durable) durableProducer.send(keyedMessage);
> else relaxedProducer.send(keyedMessage);
> }
>
> public void send(String topic, List messages, boolean durable) 
> throws Exception {
> List<KeyedMessage<String, String>> keyedMessages = new ArrayList<>();
> for(String message : messages) {
> keyedMessages.add(new KeyedMessage<>(topic,"key",message));
> }
> if (durable) durableProducer.send(keyedMessages);
> else relaxedProducer.send(keyedMessages);
> }
>
> public void send(String topic, Event event, boolean durable) throws 
> Exception {
> //KeyedMessage<String, Event> keyedMessage = new 
> KeyedMessage<>(topic, "key", event);
> //if(durable) durableProducer.send(keyedMessage);
> //else relaxedProducer.send(keyedMessage);
> }
>
> }
>
>
> On Wed, Feb 24, 2016 at 3:00 PM, david kavanagh &

Re: Connecting Storm Output to Node.js app

2016-02-24 Thread Brian Taylor
built in pub/sub mechanism for data retrieval from within Node.js
>
>
> Regards,
> Patrick
>
>
> Am 23.02.2016 um 19:47 schrieb david kavanagh <david_...@hotmail.com>:
>
> Hello,
>
> I currently have Storm up and running on two Ubuntu VM's. I have Storm
> pulling data from a database on a MySQL Cluster and now I am trying to
> figure out a way of getting Storm and a Node.js app on a remote VM
> communicating. I have tried to use a Kafka Topic to store the output
> from Storm so the app can connect to the topic and gather the data,
> but i cannot get the KafkaBolt to work as it should. I have tried
> everything
> i can think of but no luck. Is there another way of getting Storm to
> connect
> to a remote Node.js app? I have searched exhaustively online but cannot
> find
> anything. If there are some tutorials i could be pointed to that would be
> great.
> Any help at all would be greatly appreciated.
>
> Kind Regards
> David
>
>
>
>
>
> --
>
> Warm Regards,
> Brian Taylor
>
> Resolving Architecture .:.
>
> 330-812-7098
>
> br...@resolvingarchitecture.com
>
> http://resolvingarchitecture.com
>
> www.linkedin.com/in/javadevops/
>



-- 

Warm Regards,
Brian Taylor

Resolving Architecture .:.

330-812-7098

br...@resolvingarchitecture.com

http://resolvingarchitecture.com

www.linkedin.com/in/javadevops/


Re: Connecting Storm Output to Node.js app

2016-02-24 Thread Brian Taylor
Have you tried https://www.npmjs.com/package/kafka-node

On Wed, Feb 24, 2016 at 2:28 PM, Patrick Wiener <patrick.wie...@web.de>
wrote:

> Have you considered using Redis?
>
> You could hook up Storm with Redis (e.g. through Jedis) and using Redis
> built in pub/sub mechanism for data retrieval from within Node.js
>
>
> Regards,
> Patrick
>
>
> Am 23.02.2016 um 19:47 schrieb david kavanagh <david_...@hotmail.com>:
>
> Hello,
>
> I currently have Storm up and running on two Ubuntu VM's. I have Storm
> pulling data from a database on a MySQL Cluster and now I am trying to
> figure out a way of getting Storm and a Node.js app on a remote VM
> communicating. I have tried to use a Kafka Topic to store the output
> from Storm so the app can connect to the topic and gather the data,
> but i cannot get the KafkaBolt to work as it should. I have tried
> everything
> i can think of but no luck. Is there another way of getting Storm to
> connect
> to a remote Node.js app? I have searched exhaustively online but cannot
> find
> anything. If there are some tutorials i could be pointed to that would be
> great.
> Any help at all would be greatly appreciated.
>
> Kind Regards
> David
>
>
>


-- 

Warm Regards,
Brian Taylor

Resolving Architecture .:.

330-812-7098

br...@resolvingarchitecture.com

http://resolvingarchitecture.com

www.linkedin.com/in/javadevops/