Hi All, I have recently upgraded from HBase - 0.94 / Hadoop1 to HBase 0.98 / Hadoop2, which resulted compilation errors in storm trident topology.
I was using following code to aggregate and store the state to HBase (reference : https://github.com/jrkinley/storm-hbase/blob/master/src/main/java/backtype/storm/contrib/hbase/examples/HBaseTridentAggregateTopology.java) TridentConfig config = new TridentConfig("shorturl", "shortid"); config.setBatch(false); StateFactory state = HBaseAggregateState.transactional(config); TridentTopology topology = new TridentTopology(); topology.newStream("spout", spout).each(new Fields("shortid", "date"), new DatePartitionFunction(),new Fields("cf", "cq")).project(new Fields("shortid", "cf", "cq")).groupBy(new Fields("shortid", "cf", "cq")).persistentAggregate(state, new Count(), new Fields("count")); Now since i have upgraded the HBase i realized that come of the methods have undergone change hence moved to following code TridentHBaseMapper tridentHBaseMapper = new SimpleTridentHBaseMapper() .withColumnFamily("RKCSV") .withColumnFields(new Fields("cq")) .withCounterFields(new Fields("value1")) .withRowKeyField("rowKey"); HBaseState.Options options = new HBaseState.Options() .withDurability(Durability.SYNC_WAL) .withMapper(tridentHBaseMapper) .withTableName("errSmryTbl_trident"); StateFactory factory = new HBaseStateFactory(options); TridentTopology topology = new TridentTopology(); BrokerHosts zk = new ZkHosts(zkHost); TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test.topic"); spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf); Stream kafkaStream = topology.newStream("kafka", spout).shuffle(); Stream boltStream1 = kafkaStream.each(new Fields("str"), new InitializerBolt("RKCSV"), new Fields("rowKey", "cf", "cq", "value1")); GroupedStream groupedStream1 = boltStream1.groupBy(new Fields("rowKey", "cf", "cq")); groupedStream1.persistentAggregate(factory, new Fields("rowKey", "cf", "cq", "value1"), new ErrorInitBolt(), new Fields("value")); But now i an facing class cast exception java.lang.RuntimeException: java.lang.ClassCastException: org.apache.storm.hbase.trident.state.HBaseState cannot be cast to storm.trident.state.map.MapState is there anything i m missing here? the aggregator that i m using is "CombinerAggregator<List<Map<String, String>>>". Regards, Nilesh Chhapru.
