Is there a way to test the throughput of trident KafkaSpout, say, how many
message it consumes per second?

thanks

Alec

On Tue, Oct 28, 2014 at 5:15 PM, Sa Li <[email protected]> wrote:

> Thanks a lot Sai, I tried topology.newStream("topictestspout",
> kafkaSpout).parallelismHint(4~36), it wouldn't help a lot, i did see the
> actual fetch message size is 5, that is far below what I expected, the
> message size from Kafka is in the form of json, around 3.5k/msg. Below is
> my code
>
>                 BrokerHosts zk = new ZkHosts("10.100.70.128:2181");
>                 TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk,
> "topictest");
>                 spoutConf.scheme = new SchemeAsMultiScheme(new
> StringScheme());
>                 spoutConf.fetchSizeBytes = 50*1024*1024;
>                 spoutConf.bufferSizeBytes = 50*1024*1024;
>                 OpaqueTridentKafkaSpout kafkaSpout = new
> OpaqueTridentKafkaSpout(spoutConf);
>                 TridentTopology topology = new TridentTopology();
>
>                 final PostgresqlStateConfig config = new
> PostgresqlStateConfig();
>                 {
>                       config.setUrl(dburl);
>                       config.setTable("test.state");
>                       config.setKeyColumns(new String[]{"userid"});
>                       config.setValueColumns(new String[]{"event"});
>                       config.setType(StateType.NON_TRANSACTIONAL);
>                       config.setCacheSize(5000);
>                 }
>
>                  topology.newStream("topictestspout", kafkaSpout)
> //                  topology.newStream("test", new RandomTupleSpout())
>    // this test tells the kafkaSpout has the overhead to cause the latency
>                                       .parallelismHint(36)
> //                                      .shuffle()
> //                                      .each(new Fields("batchid","word"),
>                                       .each(new Fields("str"),
>                                             new JsonObjectParse(),
>                                             new Fields("userid","event"))
>                                       .groupBy(new Fields("userid"))
>
> .persistentAggregate(PostgresqlState.newFactory(config), new
> Fields("userid","event"), new EventUpdater(), new Fields( "eventword"));
> //                                      .parallelismHint(32);
>
>
> However the link you send in the last email looks good to tune the trident
> parameters, I will go through that page see if it helps. In general, the
> throughput should way larger than this, I did the consumer performance test
> on the 3-node (9 brokers) kafka cluster, the throughput was averagely
> 350Kmsg/sec,  and each message is around 3~4K. So I am not sure such long
> latency is the overhead caused by Storm or others.
>
>
> thanks
>
> Alec
>
>
> On Tue, Oct 28, 2014 at 3:59 PM, saiprasad mishra <
> [email protected]> wrote:
>
>> Something like below
>> my bad for the last quick email
>>
>> topology.newStream("topictestspout", kafkaSpout).parallelismHint(4)
>>
>> Regards
>> Sai
>>
>>
>> On Tue, Oct 28, 2014 at 3:48 PM, saiprasad mishra <
>> [email protected]> wrote:
>>
>>>
>>> you can increase the parallelismHint while creating the stream
>>>
>>> You can also take a look at this for some more explanation
>>>
>>> https://gist.github.com/mrflip/5958028
>>>
>>>
>>> Regards
>>> Sai
>>>
>>> On Tue, Oct 28, 2014 at 3:35 PM, Sa Li <[email protected]> wrote:
>>>
>>>> Thanks Sai, I set spoutConf.fetchSizeBytes = 500*1024*1024; it didn't
>>>> help, I assume there might be some other issues unresolved.
>>>>
>>>> THanks
>>>>
>>>>
>>>> Alec
>>>>
>>>> On Tue, Oct 28, 2014 at 3:18 PM, saiprasad mishra <
>>>> [email protected]> wrote:
>>>>
>>>>> You can configure how much to fetch from kafka incase you have not
>>>>> done the below like config
>>>>>
>>>>> spoutConf.fetchSizeBytes = 10*1024*1024; // these many bytes default
>>>>> is 1024*1024 i guess
>>>>>
>>>>>
>>>>> Regards
>>>>>
>>>>> Sai
>>>>>
>>>>> On Tue, Oct 28, 2014 at 2:59 PM, Sa Li <[email protected]> wrote:
>>>>>
>>>>>> Hi, all
>>>>>>
>>>>>> I have a question about TridentKafkaSpout, I am using
>>>>>> TridentKafkaSpout to consume data from
>>>>>> Kafka cluster, I found the latency of  kafkaSpout is too slow to
>>>>>> acceptable, see this is the code
>>>>>>
>>>>>> BrokerHosts zk = new ZkHosts("10.100.70.128:2181");
>>>>>> TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk,
>>>>>> "topictest");
>>>>>> spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>>>> OpaqueTridentKafkaSpout kafkaSpout = new
>>>>>> OpaqueTridentKafkaSpout(spoutConf);
>>>>>> TridentTopology topology = new TridentTopology();
>>>>>>
>>>>>> I use two ways to create stream, one is KafkaSpout to read data from
>>>>>> kafka, one is RandomTupleSpout to generate tuple randomly.
>>>>>>
>>>>>> topology.newStream("topictestspout", kafkaSpout)
>>>>>> //topology.newStream("test", new RandomTupleSpout())
>>>>>>
>>>>>> .persistentAggregate(PostgresqlState.newFactory(config), new
>>>>>> Fields("userid","event"), new EventUpdater(), new Fields( "eventword"))
>>>>>>
>>>>>>
>>>>>> I am expecting to process the batch data in State, but I found list
>>>>>> size of messages is up to 4 by using kafkaSpout no matter how large the
>>>>>> batch size I set, but with RandomTupleSpout(), the actual batch size in
>>>>>> memory can reach few hundreds (not very high), so I am concluding the
>>>>>> KafkaSpout consume data too slow, it that because something wrong with my
>>>>>> code, or something I need to change in Kafka/storm cluster 
>>>>>> configurations.
>>>>>> Currently I am using 3-nodes (9 brokers) kafka cluster, single node storm
>>>>>> cluster, 3-node zk ensemble. Above code is in the local mode.
>>>>>>
>>>>>> thanks
>>>>>>
>>>>>> Alec
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to