HI Chitra,
Were you able to run the topology without getting a ClassNotFoundException
on IMetric? Can you please let me know what jars did you use?

Regards,
Kashyap


On Fri, Mar 7, 2014 at 2:39 AM, Chitra Raveendran <
[email protected]> wrote:

> I'm not able to run a normal storm-kafka topology without specifying
>   forceStartOffsetTime parameter. Without this parameter, the topology
> should start consuming from the last message's offset, right?
>
> The kafka message is consumed as byte array. For this I just commented out
> this line.
> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>
> *Consuming from the last message is critical, as I don't want to lose out
> on the data if some systems go down unexpectedly! (This is rare and may
> never happen! Just being cautious :) )*
>
> Here is a snippet of my code:
>
> import storm.kafka.KafkaConfig.StaticHosts;
> import storm.kafka.KafkaSpout;
> import storm.kafka.SpoutConfig;
> import backtype.storm.Config;
> import backtype.storm.StormSubmitter;
> import backtype.storm.LocalCluster;
> import backtype.storm.topology.TopologyBuilder;
>
>
> public class MainTopology {
>     public static void main(String[] args) throws Exception {
>
>         List<String> hosts = new ArrayList<String>();
>         hosts.add("172.16.18.68");
>         hosts.add("172.16.18.69");
>
>         SpoutConfig spoutConfig = new
> SpoutConfig(StaticHosts.fromHostString(hosts, 2), "topic", "/TOPIC", "ID");
>         KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>
>         TopologyBuilder builder = new TopologyBuilder();
>         builder.setSpout("kafka-spout", kafkaSpout, 2);
>         builder.setBolt("parserBolt", new MessageParserBolt(),
> 2).shuffleGrouping("kafka-spout");
>
> ---------------
>
>
>
>
>
>
>
>
> --
>
> Regards,
>
> *Chitra Raveendran*
>
>
>

Reply via email to