I'm not able to run a normal storm-kafka topology without specifying
forceStartOffsetTime parameter. Without this parameter, the topology
should start consuming from the last message's offset, right?
The kafka message is consumed as byte array. For this I just commented out
this line.
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
*Consuming from the last message is critical, as I don't want to lose out
on the data if some systems go down unexpectedly! (This is rare and may
never happen! Just being cautious :) )*
Here is a snippet of my code:
import storm.kafka.KafkaConfig.StaticHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
public class MainTopology {
public static void main(String[] args) throws Exception {
List<String> hosts = new ArrayList<String>();
hosts.add("172.16.18.68");
hosts.add("172.16.18.69");
SpoutConfig spoutConfig = new
SpoutConfig(StaticHosts.fromHostString(hosts, 2), "topic", "/TOPIC", "ID");
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", kafkaSpout, 2);
builder.setBolt("parserBolt", new MessageParserBolt(),
2).shuffleGrouping("kafka-spout");
---------------
--
Regards,
*Chitra Raveendran*