Hi Jungtaek,

When i use "HBaseMapState and HBaseMapState.Options" i need to specify the 
"qualifier" field of options, but that is dynamic and coming from one of the 
Field of a bolt stream.

Is there a way i can specify the Field when i am using "HBaseMapState".

Regards,
Nilesh Chhapru.

On Thursday 06 August 2015 01:24 PM, 임정택 wrote:
I'm sorry I misunderstood you.

persistentAggregate handles grouped stream, so you need to use HBaseMapState 
and HBaseMapState.Options instead of HBaseState and HBaseState.Options.

You can refer two links for more details.
a) http://storm.apache.org/documentation/Trident-API-Overview.html - 
"Operations on grouped streams" section
b) http://storm.apache.org/documentation/Trident-state.html - 
"persistentAggregate" section

Regards,
Jungtaek Lim (HeartSaVioR)


2015-08-06 16:15 GMT+09:00 Nilesh Chhapru 
<[email protected]<mailto:[email protected]>>:
Hi ,

I have implemented the same "ptgoetz<https://github.com/ptgoetz>" code which is 
giving me the exception.

Do you or anyone have used the persistanceAggregate with trident using new 
version of HBase.

Regards,
Nilesh Chhapru.


On Thursday 06 August 2015 03:35 AM, 임정택 wrote:
Hi,

I didn't look into it, but it seems to use old or unmanaged module.
Official extern module for HBase is here, 
https://github.com/apache/storm/tree/master/external/storm-hbase.
It has been released with Storm 0.9.3 and onwards.

Please refer its README.md on Github repository, and below link to see all 
versions.
http://search.maven.org/#search%7Cgav%7C1%7Cg%3A%22org.apache.storm%22%20AND%20a%3A%22storm-hbase%22

Hope this helps.

Regards,
Jungtaek Lim (HeartSaVioR)


2015-08-05 23:53 GMT+09:00 Nilesh Chhapru 
<[email protected]<mailto:[email protected]>>:
Hi All,

I have recently upgraded from HBase - 0.94 / Hadoop1 to HBase 0.98 / Hadoop2, 
which resulted compilation errors in storm trident topology.

I was using following code to aggregate and store the state to HBase (reference 
: 
https://github.com/jrkinley/storm-hbase/blob/master/src/main/java/backtype/storm/contrib/hbase/examples/HBaseTridentAggregateTopology.java)

TridentConfig config = new TridentConfig("shorturl", "shortid");
config.setBatch(false);
StateFactory state = HBaseAggregateState.transactional(config);
TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout).each(new Fields("shortid", "date"), new 
DatePartitionFunction(),new Fields("cf", "cq")).project(new Fields("shortid", 
"cf", "cq")).groupBy(new Fields("shortid", "cf", 
"cq")).persistentAggregate(state, new Count(), new Fields("count"));

Now since i have upgraded the HBase i realized that come of the methods have 
undergone change hence moved to following code
TridentHBaseMapper tridentHBaseMapper = new SimpleTridentHBaseMapper()
                    .withColumnFamily("RKCSV")
                    .withColumnFields(new Fields("cq"))
                    .withCounterFields(new Fields("value1"))
                    .withRowKeyField("rowKey");

HBaseState.Options options = new HBaseState.Options()
                .withDurability(Durability.SYNC_WAL)
                .withMapper(tridentHBaseMapper)
                .withTableName("errSmryTbl_trident");

StateFactory factory = new HBaseStateFactory(options);
TridentTopology topology = new TridentTopology();
BrokerHosts zk = new ZkHosts(zkHost);
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test.topic");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
Stream kafkaStream = topology.newStream("kafka", spout).shuffle();
Stream boltStream1 = kafkaStream.each(new Fields("str"), new 
InitializerBolt("RKCSV"), new Fields("rowKey", "cf", "cq", "value1"));
GroupedStream groupedStream1 = boltStream1.groupBy(new Fields("rowKey", "cf", 
"cq"));
groupedStream1.persistentAggregate(factory, new Fields("rowKey", "cf", "cq", 
"value1"), new ErrorInitBolt(), new Fields("value"));

But now i an facing class cast exception java.lang.RuntimeException: 
java.lang.ClassCastException: org.apache.storm.hbase.trident.state.HBaseState 
cannot be cast to storm.trident.state.map.MapState

is there anything i m missing here? the aggregator that i m using is 
"CombinerAggregator<List<Map<String, String>>>".

Regards,
Nilesh Chhapru.



--
Name : 임 정택
Blog : http://www.heartsavior.net / http://dev.heartsavior.net
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior




--
Name : 임 정택
Blog : http://www.heartsavior.net / http://dev.heartsavior.net
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior

Reply via email to