Hi,

I didn't look into it, but it seems to use old or unmanaged module.
Official extern module for HBase is here,
https://github.com/apache/storm/tree/master/external/storm-hbase.
It has been released with Storm 0.9.3 and onwards.

Please refer its README.md on Github repository, and below link to see all
versions.
http://search.maven.org/#search%7Cgav%7C1%7Cg%3A%22org.apache.storm%22%20AND%20a%3A%22storm-hbase%22

Hope this helps.

Regards,
Jungtaek Lim (HeartSaVioR)


2015-08-05 23:53 GMT+09:00 Nilesh Chhapru <[email protected]>
:

> Hi All,
>
> I have recently upgraded from HBase - 0.94 / Hadoop1 to HBase 0.98 /
> Hadoop2, which resulted compilation errors in storm trident topology.
>
> I was using following code to aggregate and store the state to HBase 
> (*reference
> :
> https://github.com/jrkinley/storm-hbase/blob/master/src/main/java/backtype/storm/contrib/hbase/examples/HBaseTridentAggregateTopology.java
> <https://github.com/jrkinley/storm-hbase/blob/master/src/main/java/backtype/storm/contrib/hbase/examples/HBaseTridentAggregateTopology.java>*
> )
>
> *TridentConfig config = new TridentConfig("shorturl", "shortid");*
> *config.setBatch(false);*
> *StateFactory state = HBaseAggregateState.transactional(config);*
> *TridentTopology topology = new TridentTopology();*
> *topology.newStream("spout", spout).each(new Fields("shortid", "date"),
> new DatePartitionFunction(),new Fields("cf", "cq")).project(new
> Fields("shortid", "cf", "cq")).groupBy(new Fields("shortid", "cf",
> "cq")).persistentAggregate(state, new Count(), new Fields("count"));*
>
> Now since i have upgraded the HBase i realized that come of the methods
> have undergone change hence moved to following code
> *TridentHBaseMapper tridentHBaseMapper = new SimpleTridentHBaseMapper()*
> *                    .withColumnFamily("RKCSV")*
> *                    .withColumnFields(new Fields("cq"))*
> *                    .withCounterFields(new Fields("value1"))*
> *                    .withRowKeyField("rowKey");*
>
> *HBaseState.Options options = new HBaseState.Options()*
> *                .withDurability(Durability.SYNC_WAL)*
> *                .withMapper(tridentHBaseMapper)*
> *                .withTableName("errSmryTbl_trident");*
>
>
>
>
>
>
>
>
>
>
>
> *StateFactory factory = new HBaseStateFactory(options); TridentTopology
> topology = new TridentTopology(); BrokerHosts zk = new ZkHosts(zkHost);
> TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test.topic");
> spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
> OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
> Stream kafkaStream = topology.newStream("kafka", spout).shuffle(); Stream
> boltStream1 = kafkaStream.each(new Fields("str"), new
> InitializerBolt("RKCSV"), new Fields("rowKey", "cf", "cq", "value1"));
> GroupedStream groupedStream1 = boltStream1.groupBy(new Fields("rowKey",
> "cf", "cq")); groupedStream1.persistentAggregate(factory, new
> Fields("rowKey", "cf", "cq", "value1"), new ErrorInitBolt(), new
> Fields("value"));         *
> But now i an facing class cast exception
>
> *java.lang.RuntimeException: java.lang.ClassCastException:
> org.apache.storm.hbase.trident.state.HBaseState cannot be cast to
> storm.trident.state.map.MapState *is there anything i m missing here? the
> aggregator that i m using is "*CombinerAggregator<List<Map<String,
> String>>>*".
>
> Regards,
> Nilesh Chhapru.
>



-- 
Name : 임 정택
Blog : http://www.heartsavior.net / http://dev.heartsavior.net
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior

Reply via email to