Hi, I didn't look into it, but it seems to use old or unmanaged module. Official extern module for HBase is here, https://github.com/apache/storm/tree/master/external/storm-hbase. It has been released with Storm 0.9.3 and onwards.
Please refer its README.md on Github repository, and below link to see all versions. http://search.maven.org/#search%7Cgav%7C1%7Cg%3A%22org.apache.storm%22%20AND%20a%3A%22storm-hbase%22 Hope this helps. Regards, Jungtaek Lim (HeartSaVioR) 2015-08-05 23:53 GMT+09:00 Nilesh Chhapru <[email protected]> : > Hi All, > > I have recently upgraded from HBase - 0.94 / Hadoop1 to HBase 0.98 / > Hadoop2, which resulted compilation errors in storm trident topology. > > I was using following code to aggregate and store the state to HBase > (*reference > : > https://github.com/jrkinley/storm-hbase/blob/master/src/main/java/backtype/storm/contrib/hbase/examples/HBaseTridentAggregateTopology.java > <https://github.com/jrkinley/storm-hbase/blob/master/src/main/java/backtype/storm/contrib/hbase/examples/HBaseTridentAggregateTopology.java>* > ) > > *TridentConfig config = new TridentConfig("shorturl", "shortid");* > *config.setBatch(false);* > *StateFactory state = HBaseAggregateState.transactional(config);* > *TridentTopology topology = new TridentTopology();* > *topology.newStream("spout", spout).each(new Fields("shortid", "date"), > new DatePartitionFunction(),new Fields("cf", "cq")).project(new > Fields("shortid", "cf", "cq")).groupBy(new Fields("shortid", "cf", > "cq")).persistentAggregate(state, new Count(), new Fields("count"));* > > Now since i have upgraded the HBase i realized that come of the methods > have undergone change hence moved to following code > *TridentHBaseMapper tridentHBaseMapper = new SimpleTridentHBaseMapper()* > * .withColumnFamily("RKCSV")* > * .withColumnFields(new Fields("cq"))* > * .withCounterFields(new Fields("value1"))* > * .withRowKeyField("rowKey");* > > *HBaseState.Options options = new HBaseState.Options()* > * .withDurability(Durability.SYNC_WAL)* > * .withMapper(tridentHBaseMapper)* > * .withTableName("errSmryTbl_trident");* > > > > > > > > > > > > *StateFactory factory = new HBaseStateFactory(options); TridentTopology > topology = new TridentTopology(); BrokerHosts zk = new ZkHosts(zkHost); > TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test.topic"); > spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); > OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf); > Stream kafkaStream = topology.newStream("kafka", spout).shuffle(); Stream > boltStream1 = kafkaStream.each(new Fields("str"), new > InitializerBolt("RKCSV"), new Fields("rowKey", "cf", "cq", "value1")); > GroupedStream groupedStream1 = boltStream1.groupBy(new Fields("rowKey", > "cf", "cq")); groupedStream1.persistentAggregate(factory, new > Fields("rowKey", "cf", "cq", "value1"), new ErrorInitBolt(), new > Fields("value")); * > But now i an facing class cast exception > > *java.lang.RuntimeException: java.lang.ClassCastException: > org.apache.storm.hbase.trident.state.HBaseState cannot be cast to > storm.trident.state.map.MapState *is there anything i m missing here? the > aggregator that i m using is "*CombinerAggregator<List<Map<String, > String>>>*". > > Regards, > Nilesh Chhapru. > -- Name : 임 정택 Blog : http://www.heartsavior.net / http://dev.heartsavior.net Twitter : http://twitter.com/heartsavior LinkedIn : http://www.linkedin.com/in/heartsavior
