Hi , I have implemented the same "ptgoetz<https://github.com/ptgoetz>" code which is giving me the exception.
Do you or anyone have used the persistanceAggregate with trident using new version of HBase. Regards, Nilesh Chhapru. On Thursday 06 August 2015 03:35 AM, 임정택 wrote: Hi, I didn't look into it, but it seems to use old or unmanaged module. Official extern module for HBase is here, https://github.com/apache/storm/tree/master/external/storm-hbase. It has been released with Storm 0.9.3 and onwards. Please refer its README.md on Github repository, and below link to see all versions. http://search.maven.org/#search%7Cgav%7C1%7Cg%3A%22org.apache.storm%22%20AND%20a%3A%22storm-hbase%22 Hope this helps. Regards, Jungtaek Lim (HeartSaVioR) 2015-08-05 23:53 GMT+09:00 Nilesh Chhapru <[email protected]<mailto:[email protected]>>: Hi All, I have recently upgraded from HBase - 0.94 / Hadoop1 to HBase 0.98 / Hadoop2, which resulted compilation errors in storm trident topology. I was using following code to aggregate and store the state to HBase (reference : https://github.com/jrkinley/storm-hbase/blob/master/src/main/java/backtype/storm/contrib/hbase/examples/HBaseTridentAggregateTopology.java) TridentConfig config = new TridentConfig("shorturl", "shortid"); config.setBatch(false); StateFactory state = HBaseAggregateState.transactional(config); TridentTopology topology = new TridentTopology(); topology.newStream("spout", spout).each(new Fields("shortid", "date"), new DatePartitionFunction(),new Fields("cf", "cq")).project(new Fields("shortid", "cf", "cq")).groupBy(new Fields("shortid", "cf", "cq")).persistentAggregate(state, new Count(), new Fields("count")); Now since i have upgraded the HBase i realized that come of the methods have undergone change hence moved to following code TridentHBaseMapper tridentHBaseMapper = new SimpleTridentHBaseMapper() .withColumnFamily("RKCSV") .withColumnFields(new Fields("cq")) .withCounterFields(new Fields("value1")) .withRowKeyField("rowKey"); HBaseState.Options options = new HBaseState.Options() .withDurability(Durability.SYNC_WAL) .withMapper(tridentHBaseMapper) .withTableName("errSmryTbl_trident"); StateFactory factory = new HBaseStateFactory(options); TridentTopology topology = new TridentTopology(); BrokerHosts zk = new ZkHosts(zkHost); TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test.topic"); spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf); Stream kafkaStream = topology.newStream("kafka", spout).shuffle(); Stream boltStream1 = kafkaStream.each(new Fields("str"), new InitializerBolt("RKCSV"), new Fields("rowKey", "cf", "cq", "value1")); GroupedStream groupedStream1 = boltStream1.groupBy(new Fields("rowKey", "cf", "cq")); groupedStream1.persistentAggregate(factory, new Fields("rowKey", "cf", "cq", "value1"), new ErrorInitBolt(), new Fields("value")); But now i an facing class cast exception java.lang.RuntimeException: java.lang.ClassCastException: org.apache.storm.hbase.trident.state.HBaseState cannot be cast to storm.trident.state.map.MapState is there anything i m missing here? the aggregator that i m using is "CombinerAggregator<List<Map<String, String>>>". Regards, Nilesh Chhapru. -- Name : 임 정택 Blog : http://www.heartsavior.net / http://dev.heartsavior.net Twitter : http://twitter.com/heartsavior LinkedIn : http://www.linkedin.com/in/heartsavior
