You can put user and host in separate tuple fields and do fields grouping on those fields. On Feb 23, 2015 6:18 AM, "Vineet Mishra" <[email protected]> wrote:
> I tried looking for a solution and could find this, CustomStreamGrouping > > I guess this should help me out, but I am getting an exception while > implementing this. > > java.lang.RuntimeException: java.lang.IndexOutOfBoundsException at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) > at > backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) > at > backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) > at > backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748) > at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at > clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IndexOutOfBoundsException at > clojure.lang.PersistentVector.arrayFor(PersistentVector.java:107) at > clojure.lang.PersistentVector.nth(PersistentVector.java:111) at > clojure.lang.APersistentVector.get(APersistentVector.java:171) at > com.sd.dwh.kafka.storm.plugin.HostAPIGrouping.chooseTasks(HostAPIGrouping.java:24) > at > backtype.storm.daemon.executor$mk_custom_grouper$fn__3151.invoke(executor.clj:49) > at backtype.storm.daemon.task$mk_tasks_fn$fn__3101.invoke(task.clj:158) at > backtype.storm.daemon.executor$fn__3441$fn__3453$bolt_emit__3480.invoke(executor.clj:663) > at > backtype.storm.daemon.executor$fn__3441$fn$reify__3486.emit(executor.clj:698) > at backtype.storm.task.OutputCollector.emit(OutputCollector.java:203) at > backtype.storm.task.OutputCollector.emit(OutputCollector.java:49) at > backtype.storm.topology.BasicOutputCollector.emit(BasicOutputCollector.java:36) > at > backtype.storm.topology.BasicOutputCollector.emit(BasicOutputCollector.java:40) > at com.sd.dwh.kafka.storm.ParserBolt.execute(ParserBolt.java:76) at > backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) > at > backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633) > at > backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401) > at > backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) > at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) > ... 6 more > > Let me know who has even faced the same issue. > > On Mon, Feb 23, 2015 at 3:45 PM, Vineet Mishra <[email protected]> > wrote: > >> Hi All, >> >> I am having a topology with Kafka Spout Implementation with the >> topologyBuilder mentioned below, >> >> TopologyBuilder builder=new TopologyBuilder(); >> builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 8); >> builder.setBolt("Parser", new >> ParserBolt()).globalGrouping("KafkaSpout"); >> builder.setBolt("FileBolt", new >> PersistBolt()).globalGrouping("Parser"); >> >> Config config=new Config(); >> config.put(Config.TOPOLOGY_WORKERS, 4); >> config.setNumWorkers(2); >> config.setMaxSpoutPending(10); >> config.setMaxTaskParallelism(10); >> >> I am having two level of Bolts, >> >> 1) Parser - Parsing of data and emitting a output tuple value which is >> containing POJO serialized object >> 2) Persist - Persisting of the forwarded data after some computation, >> which is received through previous bolt(Parser). >> >> Now I was looking out a way for the last PersistBolt("FileBolt") I want >> the field grouping on the parser bolt based on the some field value(POJO) >> which is being emitted. >> >> >> To make it more clear, >> >> Parser is emitting a POJO of the form, >> >> collector.emit(new Values(responseHandler)); >> >> where responseHandler is a POJO, >> >> public class ResponseHandler implements Serializable{ >> >> private String host = null; >> private String user = null; >> private String msg = null; >> public String getHost() { >> return host; >> } >> public void setHost(String host) { >> this.host = host; >> } >> public String getUser() { >> return hostName; >> } >> public void setuser(String user) { >> this.user = user; >> } >> public String getMsg() { >> return msg; >> } >> public void setMsg(String msg) { >> this.msg = msg; >> } >> } >> >> Now I was looking out for a way to field group on the host and user level. >> >> Actively looking for the way around! >> >> Thanks! >> > >
