I'm sorry I misunderstood you. persistentAggregate handles grouped stream, so you need to use HBaseMapState and HBaseMapState.Options instead of HBaseState and HBaseState.Options.
You can refer two links for more details. a) http://storm.apache.org/documentation/Trident-API-Overview.html - "Operations on grouped streams" section b) http://storm.apache.org/documentation/Trident-state.html - "persistentAggregate" section Regards, Jungtaek Lim (HeartSaVioR) 2015-08-06 16:15 GMT+09:00 Nilesh Chhapru <[email protected]> : > Hi , > > I have implemented the same "ptgoetz <https://github.com/ptgoetz>" code > which is giving me the exception. > > Do you or anyone have used the persistanceAggregate with trident using new > version of HBase. > > Regards, > Nilesh Chhapru. > > > On Thursday 06 August 2015 03:35 AM, 임정택 wrote: > > Hi, > > I didn't look into it, but it seems to use old or unmanaged module. > Official extern module for HBase is here, > https://github.com/apache/storm/tree/master/external/storm-hbase. > It has been released with Storm 0.9.3 and onwards. > > Please refer its README.md on Github repository, and below link to see all > versions. > > http://search.maven.org/#search%7Cgav%7C1%7Cg%3A%22org.apache.storm%22%20AND%20a%3A%22storm-hbase%22 > > Hope this helps. > > Regards, > Jungtaek Lim (HeartSaVioR) > > > 2015-08-05 23:53 GMT+09:00 Nilesh Chhapru < > [email protected]>: > >> Hi All, >> >> I have recently upgraded from HBase - 0.94 / Hadoop1 to HBase 0.98 / >> Hadoop2, which resulted compilation errors in storm trident topology. >> >> I was using following code to aggregate and store the state to HBase >> (*reference >> : >> https://github.com/jrkinley/storm-hbase/blob/master/src/main/java/backtype/storm/contrib/hbase/examples/HBaseTridentAggregateTopology.java >> <https://github.com/jrkinley/storm-hbase/blob/master/src/main/java/backtype/storm/contrib/hbase/examples/HBaseTridentAggregateTopology.java>* >> ) >> >> *TridentConfig config = new TridentConfig("shorturl", "shortid");* >> *config.setBatch(false);* >> *StateFactory state = HBaseAggregateState.transactional(config);* >> *TridentTopology topology = new TridentTopology();* >> *topology.newStream("spout", spout).each(new Fields("shortid", "date"), >> new DatePartitionFunction(),new Fields("cf", "cq")).project(new >> Fields("shortid", "cf", "cq")).groupBy(new Fields("shortid", "cf", >> "cq")).persistentAggregate(state, new Count(), new Fields("count"));* >> >> Now since i have upgraded the HBase i realized that come of the methods >> have undergone change hence moved to following code >> *TridentHBaseMapper tridentHBaseMapper = new SimpleTridentHBaseMapper()* >> * .withColumnFamily("RKCSV")* >> * .withColumnFields(new Fields("cq"))* >> * .withCounterFields(new Fields("value1"))* >> * .withRowKeyField("rowKey");* >> >> *HBaseState.Options options = new HBaseState.Options()* >> * .withDurability(Durability.SYNC_WAL)* >> * .withMapper(tridentHBaseMapper)* >> * .withTableName("errSmryTbl_trident");* >> >> >> >> >> >> >> >> >> >> >> >> *StateFactory factory = new HBaseStateFactory(options); TridentTopology >> topology = new TridentTopology(); BrokerHosts zk = new ZkHosts(zkHost); >> TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test.topic"); >> spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); >> OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf); >> Stream kafkaStream = topology.newStream("kafka", spout).shuffle(); Stream >> boltStream1 = kafkaStream.each(new Fields("str"), new >> InitializerBolt("RKCSV"), new Fields("rowKey", "cf", "cq", "value1")); >> GroupedStream groupedStream1 = boltStream1.groupBy(new Fields("rowKey", >> "cf", "cq")); groupedStream1.persistentAggregate(factory, new >> Fields("rowKey", "cf", "cq", "value1"), new ErrorInitBolt(), new >> Fields("value")); * >> But now i an facing class cast exception >> >> *java.lang.RuntimeException: java.lang.ClassCastException: >> org.apache.storm.hbase.trident.state.HBaseState cannot be cast to >> storm.trident.state.map.MapState *is there anything i m missing here? >> the aggregator that i m using is "*CombinerAggregator<List<Map<String, >> String>>>*". >> >> Regards, >> Nilesh Chhapru. >> > > > > -- > Name : 임 정택 > Blog : http://www.heartsavior.net / http://dev.heartsavior.net > Twitter : http://twitter.com/heartsavior > LinkedIn : http://www.linkedin.com/in/heartsavior > > > -- Name : 임 정택 Blog : http://www.heartsavior.net / http://dev.heartsavior.net Twitter : http://twitter.com/heartsavior LinkedIn : http://www.linkedin.com/in/heartsavior
