Siddharth,
Kafka and storm scale when you add more nodes.
Although 150msg/sec is not much of traffic to kafka or storm.
>From your config above you have 1 worker and bolt parallelism
is at 50 thats seems very high for 1 worker. I would start at
checking kafka if you are able to read off those messages at a
higher rate than 12 per sec. You can
try kafka-simple-consumer-perf-test.sh under kafka bin dir. Try
reducing the parallelism hint for the bolts or just start a
spout that just read off kafka and emit see how many messages
per sec it can do if it up to the mark than the issue might be
in your bolt execute and also the parallelism of bolt being too
high. Try default config for worker.child.opts and add few
options at a time instead of above config.
-Harsha
On Tue, Jul 1, 2014, at 08:38 PM, Siddharth Banerjee wrote:
We are seeing some performance issues with Kafka + Storm +
Trident + OpaqueTridentKafkaSpout
Mentioned below are our setup details :
Storm Topology :
Broker broker = Broker.fromString("localhost:9092")
GlobalPartitionInformation info = new GlobalPartitionInformation()
if(args[4]){
int partitionCount = args[4].toInteger()
for(int i =0;i<partitionCount;i++){
info.addPartition(i, broker)
}
}
StaticHosts hosts = new StaticHosts(info)
TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(hosts
,"test")
tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme
())
OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(tri
dentKafkaConfig)
TridentTopology topology = new TridentTopology()
Stream st = topology.newStream("spout1", kafkaSpout).parallelismHin
t(args[2].toInteger())
.each(kafkaSpout.getOutputFields(), new NEO4JTridentFunction
(), new Fields("status"))
.parallelismHint(args[1].toInteger())
Map conf = new HashMap()
conf.put(Config.TOPOLOGY_WORKERS, args[3].toInteger())
conf.put(Config.TOPOLOGY_DEBUG, false)
if (args[0] == "local") {
LocalCluster cluster = new LocalCluster()
cluster.submitTopology("mytopology", conf, topology.build())
} else {
StormSubmitter.submitTopology("mytopology", conf, topology.build
())
NEO4JTridentFunction.getGraphDatabaseService().shutdown()
}
Storm.yaml we are using for Storm is as below :
########### These MUST be filled in for a storm configuration
storm.zookeeper.servers:
- "localhost"
# - "server2"
#
storm.zookeeper.port : 2999
storm.local.dir: "/opt/mphrx/neo4j/stormdatadir"
nimbus.childopts: "-Xms2048m"
ui.childopts: "-Xms1024m"
logviewer.childopts: "-Xmx512m"
supervisor.childopts: "-Xms1024m"
worker.childopts: "-Xms2600m -Xss256k -XX:MaxPermSize=128m -XX:PermSize=
96m
-XX:NewSize=1000m -XX:MaxNewSize=1000m -XX:MaxTenuringThreshold=1 -X
X:SurvivorRatio=6
-XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabl
ed
-XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancy
Only
-server -XX:+AggressiveOpts -XX:+UseCompressedOops -Djava.awt.headle
ss=true -Djava.net.preferIPv4Stack=true
-Xloggc:logs/gc-worker-%ID%.log -verbose:gc
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSiz
e=1m
-XX:+PrintGCDetails -XX:+PrintHeapAtGC -XX:+PrintGCTimeStamps -XX:+P
rintClassHistogram
-XX:+PrintTenuringDistribution -XX:-PrintGCApplicationStoppedTime -X
X:-PrintGCApplicationConcurrentTime
-XX:+PrintCommandLineFlags -XX:+PrintFlagsFinal"
java.library.path: "/usr/lib/jvm/jdk1.7.0_25"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
topology.trident.batch.emit.interval.millis: 100
topology.message.timeout.secs: 300
#topology.max.spout.pending: 10000
* Size of each message produced in Kafka : 11 KB
* Execution time of each bolt(NEO4JTridentFunction) to
process the data : 500ms
* No. of Storm Workers : 1
* Parallelism hint for Spout(OpaqueTridentKafkaSpout): 1
* Parallelism hint for Bolt/Function(NEO4JTridentFunction) :
50
* We are seeing throughput of around 12msgs/sec from Spout.
* Rate of messages produced in Kafka : 150msgs/sec
Both Storm and Kafka are a single node deployment. We have read
about much higher throughput from Storm but are unable to
produce the same. Please suggest how to tune the Storm+ Kafka +
OpaqueTridentKafkaSpout configuration to achieve higher
throughput. Any help in this regard would help us immensely.
Thanks for the help :)
Siddharth Banerjee
MphRx
Saving Lives Through Technology
Office: (561) 866 4156
Cell: (646) 480-0293
Cell: +91 813 014 3331