Thanks a lot Sai, I tried topology.newStream("topictestspout",
kafkaSpout).parallelismHint(4~36),
it wouldn't help a lot, i did see the actual fetch message size is 5, that
is far below what I expected, the message size from Kafka is in the form of
json, around 3.5k/msg. Below is my code

                BrokerHosts zk = new ZkHosts("10.100.70.128:2181");
                TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk,
"topictest");
                spoutConf.scheme = new SchemeAsMultiScheme(new
StringScheme());
                spoutConf.fetchSizeBytes = 50*1024*1024;
                spoutConf.bufferSizeBytes = 50*1024*1024;
                OpaqueTridentKafkaSpout kafkaSpout = new
OpaqueTridentKafkaSpout(spoutConf);
                TridentTopology topology = new TridentTopology();

                final PostgresqlStateConfig config = new
PostgresqlStateConfig();
                {
                      config.setUrl(dburl);
                      config.setTable("test.state");
                      config.setKeyColumns(new String[]{"userid"});
                      config.setValueColumns(new String[]{"event"});
                      config.setType(StateType.NON_TRANSACTIONAL);
                      config.setCacheSize(5000);
                }

                 topology.newStream("topictestspout", kafkaSpout)
//                  topology.newStream("test", new RandomTupleSpout())
   // this test tells the kafkaSpout has the overhead to cause the latency
                                      .parallelismHint(36)
//                                      .shuffle()
//                                      .each(new Fields("batchid","word"),
                                      .each(new Fields("str"),
                                            new JsonObjectParse(),
                                            new Fields("userid","event"))
                                      .groupBy(new Fields("userid"))

.persistentAggregate(PostgresqlState.newFactory(config), new
Fields("userid","event"), new EventUpdater(), new Fields( "eventword"));
//                                      .parallelismHint(32);


However the link you send in the last email looks good to tune the trident
parameters, I will go through that page see if it helps. In general, the
throughput should way larger than this, I did the consumer performance test
on the 3-node (9 brokers) kafka cluster, the throughput was averagely
350Kmsg/sec,  and each message is around 3~4K. So I am not sure such long
latency is the overhead caused by Storm or others.


thanks

Alec


On Tue, Oct 28, 2014 at 3:59 PM, saiprasad mishra <[email protected]
> wrote:

> Something like below
> my bad for the last quick email
>
> topology.newStream("topictestspout", kafkaSpout).parallelismHint(4)
>
> Regards
> Sai
>
>
> On Tue, Oct 28, 2014 at 3:48 PM, saiprasad mishra <
> [email protected]> wrote:
>
>>
>> you can increase the parallelismHint while creating the stream
>>
>> You can also take a look at this for some more explanation
>>
>> https://gist.github.com/mrflip/5958028
>>
>>
>> Regards
>> Sai
>>
>> On Tue, Oct 28, 2014 at 3:35 PM, Sa Li <[email protected]> wrote:
>>
>>> Thanks Sai, I set spoutConf.fetchSizeBytes = 500*1024*1024; it didn't
>>> help, I assume there might be some other issues unresolved.
>>>
>>> THanks
>>>
>>>
>>> Alec
>>>
>>> On Tue, Oct 28, 2014 at 3:18 PM, saiprasad mishra <
>>> [email protected]> wrote:
>>>
>>>> You can configure how much to fetch from kafka incase you have not done
>>>> the below like config
>>>>
>>>> spoutConf.fetchSizeBytes = 10*1024*1024; // these many bytes default
>>>> is 1024*1024 i guess
>>>>
>>>>
>>>> Regards
>>>>
>>>> Sai
>>>>
>>>> On Tue, Oct 28, 2014 at 2:59 PM, Sa Li <[email protected]> wrote:
>>>>
>>>>> Hi, all
>>>>>
>>>>> I have a question about TridentKafkaSpout, I am using
>>>>> TridentKafkaSpout to consume data from
>>>>> Kafka cluster, I found the latency of  kafkaSpout is too slow to
>>>>> acceptable, see this is the code
>>>>>
>>>>> BrokerHosts zk = new ZkHosts("10.100.70.128:2181");
>>>>> TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "topictest");
>>>>> spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>>> OpaqueTridentKafkaSpout kafkaSpout = new
>>>>> OpaqueTridentKafkaSpout(spoutConf);
>>>>> TridentTopology topology = new TridentTopology();
>>>>>
>>>>> I use two ways to create stream, one is KafkaSpout to read data from
>>>>> kafka, one is RandomTupleSpout to generate tuple randomly.
>>>>>
>>>>> topology.newStream("topictestspout", kafkaSpout)
>>>>> //topology.newStream("test", new RandomTupleSpout())
>>>>>
>>>>> .persistentAggregate(PostgresqlState.newFactory(config), new
>>>>> Fields("userid","event"), new EventUpdater(), new Fields( "eventword"))
>>>>>
>>>>>
>>>>> I am expecting to process the batch data in State, but I found list
>>>>> size of messages is up to 4 by using kafkaSpout no matter how large the
>>>>> batch size I set, but with RandomTupleSpout(), the actual batch size in
>>>>> memory can reach few hundreds (not very high), so I am concluding the
>>>>> KafkaSpout consume data too slow, it that because something wrong with my
>>>>> code, or something I need to change in Kafka/storm cluster configurations.
>>>>> Currently I am using 3-nodes (9 brokers) kafka cluster, single node storm
>>>>> cluster, 3-node zk ensemble. Above code is in the local mode.
>>>>>
>>>>> thanks
>>>>>
>>>>> Alec
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to