I'm sorry I misunderstood you.

persistentAggregate handles grouped stream, so you need to use
HBaseMapState and HBaseMapState.Options instead of HBaseState and
HBaseState.Options.

You can refer two links for more details.
a) http://storm.apache.org/documentation/Trident-API-Overview.html -
"Operations on grouped streams" section
b) http://storm.apache.org/documentation/Trident-state.html -
"persistentAggregate" section

Regards,
Jungtaek Lim (HeartSaVioR)


2015-08-06 16:15 GMT+09:00 Nilesh Chhapru <[email protected]>
:

> Hi ,
>
> I have implemented the same "ptgoetz <https://github.com/ptgoetz>" code
> which is giving me the exception.
>
> Do you or anyone have used the persistanceAggregate with trident using new
> version of HBase.
>
> Regards,
> Nilesh Chhapru.
>
>
> On Thursday 06 August 2015 03:35 AM, 임정택 wrote:
>
> Hi,
>
> I didn't look into it, but it seems to use old or unmanaged module.
> Official extern module for HBase is here,
> https://github.com/apache/storm/tree/master/external/storm-hbase.
> It has been released with Storm 0.9.3 and onwards.
>
> Please refer its README.md on Github repository, and below link to see all
> versions.
>
> http://search.maven.org/#search%7Cgav%7C1%7Cg%3A%22org.apache.storm%22%20AND%20a%3A%22storm-hbase%22
>
> Hope this helps.
>
> Regards,
> Jungtaek Lim (HeartSaVioR)
>
>
> 2015-08-05 23:53 GMT+09:00 Nilesh Chhapru <
> [email protected]>:
>
>> Hi All,
>>
>> I have recently upgraded from HBase - 0.94 / Hadoop1 to HBase 0.98 /
>> Hadoop2, which resulted compilation errors in storm trident topology.
>>
>> I was using following code to aggregate and store the state to HBase 
>> (*reference
>> :
>> https://github.com/jrkinley/storm-hbase/blob/master/src/main/java/backtype/storm/contrib/hbase/examples/HBaseTridentAggregateTopology.java
>> <https://github.com/jrkinley/storm-hbase/blob/master/src/main/java/backtype/storm/contrib/hbase/examples/HBaseTridentAggregateTopology.java>*
>> )
>>
>> *TridentConfig config = new TridentConfig("shorturl", "shortid");*
>> *config.setBatch(false);*
>> *StateFactory state = HBaseAggregateState.transactional(config);*
>> *TridentTopology topology = new TridentTopology();*
>> *topology.newStream("spout", spout).each(new Fields("shortid", "date"),
>> new DatePartitionFunction(),new Fields("cf", "cq")).project(new
>> Fields("shortid", "cf", "cq")).groupBy(new Fields("shortid", "cf",
>> "cq")).persistentAggregate(state, new Count(), new Fields("count"));*
>>
>> Now since i have upgraded the HBase i realized that come of the methods
>> have undergone change hence moved to following code
>> *TridentHBaseMapper tridentHBaseMapper = new SimpleTridentHBaseMapper()*
>> *                    .withColumnFamily("RKCSV")*
>> *                    .withColumnFields(new Fields("cq"))*
>> *                    .withCounterFields(new Fields("value1"))*
>> *                    .withRowKeyField("rowKey");*
>>
>> *HBaseState.Options options = new HBaseState.Options()*
>> *                .withDurability(Durability.SYNC_WAL)*
>> *                .withMapper(tridentHBaseMapper)*
>> *                .withTableName("errSmryTbl_trident");*
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *StateFactory factory = new HBaseStateFactory(options); TridentTopology
>> topology = new TridentTopology(); BrokerHosts zk = new ZkHosts(zkHost);
>> TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test.topic");
>> spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>> OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
>> Stream kafkaStream = topology.newStream("kafka", spout).shuffle(); Stream
>> boltStream1 = kafkaStream.each(new Fields("str"), new
>> InitializerBolt("RKCSV"), new Fields("rowKey", "cf", "cq", "value1"));
>> GroupedStream groupedStream1 = boltStream1.groupBy(new Fields("rowKey",
>> "cf", "cq")); groupedStream1.persistentAggregate(factory, new
>> Fields("rowKey", "cf", "cq", "value1"), new ErrorInitBolt(), new
>> Fields("value"));         *
>> But now i an facing class cast exception
>>
>> *java.lang.RuntimeException: java.lang.ClassCastException:
>> org.apache.storm.hbase.trident.state.HBaseState cannot be cast to
>> storm.trident.state.map.MapState *is there anything i m missing here?
>> the aggregator that i m using is "*CombinerAggregator<List<Map<String,
>> String>>>*".
>>
>> Regards,
>> Nilesh Chhapru.
>>
>
>
>
> --
> Name : 임 정택
> Blog : http://www.heartsavior.net / http://dev.heartsavior.net
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
>
>
>


-- 
Name : 임 정택
Blog : http://www.heartsavior.net / http://dev.heartsavior.net
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior

Reply via email to