The KafkaSpout writes commit offsets to zookeeper, by default the storm cluster's zookeeper. You can find the path in PartitionManager.committedPath(). Perhaps you can take a look of what's in zookeeper and see if something's off.
On Fri, Mar 7, 2014 at 12:39 AM, Chitra Raveendran < [email protected]> wrote: > I'm not able to run a normal storm-kafka topology without specifying > forceStartOffsetTime parameter. Without this parameter, the topology > should start consuming from the last message's offset, right? > > The kafka message is consumed as byte array. For this I just commented out > this line. > kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); > > *Consuming from the last message is critical, as I don't want to lose out > on the data if some systems go down unexpectedly! (This is rare and may > never happen! Just being cautious :) )* > > Here is a snippet of my code: > > import storm.kafka.KafkaConfig.StaticHosts; > import storm.kafka.KafkaSpout; > import storm.kafka.SpoutConfig; > import backtype.storm.Config; > import backtype.storm.StormSubmitter; > import backtype.storm.LocalCluster; > import backtype.storm.topology.TopologyBuilder; > > > public class MainTopology { > public static void main(String[] args) throws Exception { > > List<String> hosts = new ArrayList<String>(); > hosts.add("172.16.18.68"); > hosts.add("172.16.18.69"); > > SpoutConfig spoutConfig = new > SpoutConfig(StaticHosts.fromHostString(hosts, 2), "topic", "/TOPIC", "ID"); > KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); > > TopologyBuilder builder = new TopologyBuilder(); > builder.setSpout("kafka-spout", kafkaSpout, 2); > builder.setBolt("parserBolt", new MessageParserBolt(), > 2).shuffleGrouping("kafka-spout"); > > --------------- > > > > > > > > > -- > > Regards, > > *Chitra Raveendran* > > > -- Lin Zhao https://wiki.groupondev.com/Message_Bus 3101 Park Blvd, Palo Alto, CA 94306
