We are seeing some performance issues with Kafka + Storm + Trident + 
OpaqueTridentKafkaSpout

Mentioned below are our setup details :

Storm Topology :

    Broker broker = Broker.fromString("localhost:9092")
    GlobalPartitionInformation info = new GlobalPartitionInformation()
    if(args[4]){
        int partitionCount = args[4].toInteger()
        for(int i =0;i<partitionCount;i++){
            info.addPartition(i, broker)
        }
    }
    StaticHosts hosts = new StaticHosts(info)
    TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(hosts,"test")
    tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme())


    OpaqueTridentKafkaSpout kafkaSpout = new 
OpaqueTridentKafkaSpout(tridentKafkaConfig)
    TridentTopology topology = new TridentTopology()
    Stream st  = topology.newStream("spout1", 
kafkaSpout).parallelismHint(args[2].toInteger())
            .each(kafkaSpout.getOutputFields(), new NEO4JTridentFunction(), new 
Fields("status"))
            .parallelismHint(args[1].toInteger())
    Map conf = new HashMap()
    conf.put(Config.TOPOLOGY_WORKERS, args[3].toInteger())
    conf.put(Config.TOPOLOGY_DEBUG, false)

    if (args[0] == "local") {
        LocalCluster cluster = new LocalCluster()
        cluster.submitTopology("mytopology", conf, topology.build())
    } else {
        StormSubmitter.submitTopology("mytopology", conf, topology.build())
        NEO4JTridentFunction.getGraphDatabaseService().shutdown()
    }


Storm.yaml we are using for Storm is as below :

########### These MUST be filled in for a storm configuration
storm.zookeeper.servers:
     - "localhost"
#     - "server2"
#
storm.zookeeper.port : 2999


storm.local.dir: "/opt/mphrx/neo4j/stormdatadir"

nimbus.childopts: "-Xms2048m"
ui.childopts: "-Xms1024m"
logviewer.childopts: "-Xmx512m"
supervisor.childopts: "-Xms1024m"
worker.childopts: "-Xms2600m -Xss256k -XX:MaxPermSize=128m -XX:PermSize=96m
    -XX:NewSize=1000m -XX:MaxNewSize=1000m -XX:MaxTenuringThreshold=1 
-XX:SurvivorRatio=6
    -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled
    -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly
    -server -XX:+AggressiveOpts -XX:+UseCompressedOops -Djava.awt.headless=true 
-Djava.net.preferIPv4Stack=true
    -Xloggc:logs/gc-worker-%ID%.log -verbose:gc
    -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1m
    -XX:+PrintGCDetails -XX:+PrintHeapAtGC -XX:+PrintGCTimeStamps 
-XX:+PrintClassHistogram
    -XX:+PrintTenuringDistribution -XX:-PrintGCApplicationStoppedTime 
-XX:-PrintGCApplicationConcurrentTime
    -XX:+PrintCommandLineFlags -XX:+PrintFlagsFinal"

java.library.path: "/usr/lib/jvm/jdk1.7.0_25"

supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703

topology.trident.batch.emit.interval.millis: 100
topology.message.timeout.secs: 300
#topology.max.spout.pending: 10000


  *   Size of each message produced in Kafka : 11 KB
  *   Execution time of each bolt(NEO4JTridentFunction) to process the data : 
500ms
  *   No. of Storm Workers : 1
  *   Parallelism hint for Spout(OpaqueTridentKafkaSpout): 1
  *   Parallelism hint for Bolt/Function(NEO4JTridentFunction) : 50

  *   We are seeing throughput of around 12msgs/sec from Spout.

  *   Rate of messages produced in Kafka : 150msgs/sec

Both Storm and Kafka are a single node deployment. We have read about much 
higher throughput from Storm but are unable to produce the same. Please suggest 
how to tune the Storm+ Kafka + OpaqueTridentKafkaSpout configuration to achieve 
higher throughput. Any help in this regard would help us immensely.

Thanks for the help :)

Siddharth Banerjee
MphRx
Saving Lives Through Technology
Office: (561) 866  4156
Cell: (646) 480-0293
Cell: +91 813 014 3331

Reply via email to