Hi All,

I have recently upgraded from HBase - 0.94 / Hadoop1 to HBase 0.98 / Hadoop2, 
which resulted compilation errors in storm trident topology.

I was using following code to aggregate and store the state to HBase (reference 
: 
https://github.com/jrkinley/storm-hbase/blob/master/src/main/java/backtype/storm/contrib/hbase/examples/HBaseTridentAggregateTopology.java)

TridentConfig config = new TridentConfig("shorturl", "shortid");
config.setBatch(false);
StateFactory state = HBaseAggregateState.transactional(config);
TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout).each(new Fields("shortid", "date"), new 
DatePartitionFunction(),new Fields("cf", "cq")).project(new Fields("shortid", 
"cf", "cq")).groupBy(new Fields("shortid", "cf", 
"cq")).persistentAggregate(state, new Count(), new Fields("count"));

Now since i have upgraded the HBase i realized that come of the methods have 
undergone change hence moved to following code
TridentHBaseMapper tridentHBaseMapper = new SimpleTridentHBaseMapper()
                    .withColumnFamily("RKCSV")
                    .withColumnFields(new Fields("cq"))
                    .withCounterFields(new Fields("value1"))
                    .withRowKeyField("rowKey");

HBaseState.Options options = new HBaseState.Options()
                .withDurability(Durability.SYNC_WAL)
                .withMapper(tridentHBaseMapper)
                .withTableName("errSmryTbl_trident");

StateFactory factory = new HBaseStateFactory(options);
TridentTopology topology = new TridentTopology();
BrokerHosts zk = new ZkHosts(zkHost);
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test.topic");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
Stream kafkaStream = topology.newStream("kafka", spout).shuffle();
Stream boltStream1 = kafkaStream.each(new Fields("str"), new 
InitializerBolt("RKCSV"), new Fields("rowKey", "cf", "cq", "value1"));
GroupedStream groupedStream1 = boltStream1.groupBy(new Fields("rowKey", "cf", 
"cq"));
groupedStream1.persistentAggregate(factory, new Fields("rowKey", "cf", "cq", 
"value1"), new ErrorInitBolt(), new Fields("value"));

But now i an facing class cast exception java.lang.RuntimeException: 
java.lang.ClassCastException: org.apache.storm.hbase.trident.state.HBaseState 
cannot be cast to storm.trident.state.map.MapState

is there anything i m missing here? the aggregator that i m using is 
"CombinerAggregator<List<Map<String, String>>>".

Regards,
Nilesh Chhapru.

Reply via email to