I'm not able to run a normal storm-kafka topology without specifying
  forceStartOffsetTime parameter. Without this parameter, the topology
should start consuming from the last message's offset, right?

The kafka message is consumed as byte array. For this I just commented out
this line.
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

*Consuming from the last message is critical, as I don't want to lose out
on the data if some systems go down unexpectedly! (This is rare and may
never happen! Just being cautious :) )*

Here is a snippet of my code:

import storm.kafka.KafkaConfig.StaticHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;


public class MainTopology {
    public static void main(String[] args) throws Exception {

        List<String> hosts = new ArrayList<String>();
        hosts.add("172.16.18.68");
        hosts.add("172.16.18.69");

        SpoutConfig spoutConfig = new
SpoutConfig(StaticHosts.fromHostString(hosts, 2), "topic", "/TOPIC", "ID");
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka-spout", kafkaSpout, 2);
        builder.setBolt("parserBolt", new MessageParserBolt(),
2).shuffleGrouping("kafka-spout");

---------------








-- 

Regards,

*Chitra Raveendran*

Reply via email to