Sorry for post again, really need someone point me the right way, : ),
thanks
On Dec 31, 2014 10:34 AM, "Sa Li" <[email protected]> wrote:

> Hi, All
>
> Happy new year!
>
> I am using TridentKafkaSpout to collect data from kafka, and having a
> trident state as well, here is the code
>
>
> static class EventUpdater implements ReducerAggregator<List<String>> {
>
>             @Override
>             public List<String> init(){
>                      return null;
>             }
>             @Override
>             public List<String> reduce(List<String> curr, TridentTuple
> tuple) {
>                    List<String> updated = null ;
>
>                    if ( curr == null ) {
>                                     String event = (String)
> tuple.getValue(1);
>                                     updated = Lists.newArrayList(event);
>                    } else {
>                                     updated = curr ;
>                    }
>               return updated ;
>             }
>         }
>
>
> BrokerHosts zk = new ZkHosts(CrunchifyGetPropertyValues().get(2));
> TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk,
> CrunchifyGetPropertyValues().get(3));
> spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
> spoutConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
> spoutConf.forceFromStart = true;
> OpaqueTridentKafkaSpout kafkaSpout = new
> OpaqueTridentKafkaSpout(spoutConf);
> TridentTopology topology = new TridentTopology();
>
> final DBStateConfig config = new DBStateConfig();
> {
>                       config.setUrl(dburl);
>                       config.setTable("test.state");
>                       config.setKeyColumns(new String[]{"userid"});
>                       config.setValueColumns(new String[]{"event"});
>                       config.setType(StateType.OPAQUE);
>                       config.setCacheSize(500);
> }
>
>  topology.newStream("topictestspout", kafkaSpout)
>                               .each(new Fields("str"),
>                                        new JsonObjectParse(),
>                                        new Fields("userid","event"))
>                               .parallelismHint(6)
>                               .groupBy(new Fields("userid"))
>
> .persistentAggregate(DBState.newFactory(config), new
> Fields("userid","event"), new EventUpdater(), new Fields( "eventword"));
>
>
> In state which is implemented by IBackingMap<T>, I print the keys in
> multiPut(final List<List<Object>> keys, final List<T> values), I found it
> only get one
> tuple out of a batch of tuples, see
>
> multiPut start .....
> keys = [[0]]
> values =
> [[{"time":1419441764,"machine":"SALI","user":0,"eventType":"PageView","method":"GET","host":"localhost","page":"/mysite/mypath/v5","querystring":{"param1":"value1","param2":"value2"},"accepttypes":["application/json"],"loadtime":1001.0,"rendertime":200.0,"totaltime":1138.0}]]
>
> multiPut start .....
> keys = [[0]]
> values =
> [[{"time":1419441764,"machine":"SALI","user":0,"eventType":"PageView","method":"GET","host":"localhost","page":"/mysite/mypath/v5","querystring":{"param1":"value1","param2":"value2"},"accepttypes":["application/json"],"loadtime":1001.0,"rendertime":200.0,"totaltime":1138.0}]]
>
> multiPut start .....
> keys = [[0]]
> values =
> [[{"time":1419441764,"machine":"SALI","user":0,"eventType":"PageView","method":"GET","host":"localhost","page":"/mysite/mypath/v5","querystring":{"param1":"value1","param2":"value2"},"accepttypes":["application/json"],"loadtime":1001.0,"rendertime":200.0,"totaltime":1138.0}]]
>
>
> I am attempting to know what is the problem is, I replace kafkaSpout by a
> RandomTupleSpout
>
>  static class RandomTupleSpout implements IBatchSpout {
>             private transient Random random;
>             private static final int BATCH = 40;
>             @Override
>             @SuppressWarnings("rawtypes")
>             public void open(final Map conf, final TopologyContext
> context) {
>                           random = new Random();
>             }
>             @Override
>             public void emitBatch(final long batchId, final
> TridentCollector collector) {
>                   for (int i = 0; i < BATCH; i++) {
>                           collector.emit(new Values(i, "test string
> inserted into this table "+i));
>                   }
>             }
>             @Override
>             public void ack(final long batchId) {}
>             @Override
>             public void close() {}
>             @Override
>             @SuppressWarnings("rawtypes")
>             public Map getComponentConfiguration() {
>                    return null;
>             }
>             @Override
>             public Fields getOutputFields() {
>                      return new Fields("userid", "event");
>               //       return new Fields("event");
>             }
>         }
>
> Now I am able to get it in batch, like
> multiPut start .....
> [[37], [38], [39], [33], [34], [35], [36], [0], [3], [4], [1], [2], [7],
> [8], [5], [6], [11], [12], [9], [10], [15], [16], [13], [14], [20], [19],
> [18], [17], [24], [23], [22], [21], [28], [27], [26], [25], [32], [31],
> [30], [29]]
> keys = [[37], [38], [39], [33], [34], [35], [36], [0], [3], [4], [1], [2],
> [7], [8], [5], [6], [11], [12], [9], [10], [15], [16], [13], [14], [20],
> [19], [18], [17], [24], [23], [22], [21], [28], [27], [26], [25], [32],
> [31], [30], [29]]
> values = [[test string inserted into this table 37], [test string inserted
> into this table 38], [test string inserted into this table 39], [test
> string inserted into this table 33], [test string inserted into this table
> 34], [test string inserted into this table 35], [test string inserted into
> this table 36], [test string inserted into this table 0], [test string
> inserted into this table 3], [test string inserted into this table 4],
> [test string inserted into this table 1], [test string inserted into this
> table 2], [test string inserted into this table 7], [test string inserted
> into this table 8], [test string inserted into this table 5], [test string
> inserted into this table 6], [test string inserted into this table 11],
> [test string inserted into this table 12], [test string inserted into this
> table 9], [test string inserted into this table 10], [test string inserted
> into this table 15], [test string inserted into this table 16], [test
> string inserted into this table 13], [test string inserted into this table
> 14], [test string inserted into this table 20], [test string inserted into
> this table 19], [test string inserted into this table 18], [test string
> inserted into this table 17], [test string inserted into this table 24],
> [test string inserted into this table 23], [test string inserted into this
> table 22], [test string inserted into this table 21], [test string inserted
> into this table 28], [test string inserted into this table 27], [test
> string inserted into this table 26], [test string inserted into this table
> 25], [test string inserted into this table 32], [test string inserted into
> this table 31], [test string inserted into this table 30], [test string
> inserted into this table 29]]
>
> This makes me realize that KafkaSpout doesn't fetch data in batch ( while
> RandomTupleSpout implements IBatchSpout), how can I fix this problem? In
> addition, I feel my EventUpdater which implements ReducerAggregator might
> also cause the fields problem.
>
>
> Any help?
>
>
> thanks a lot
>
>
> AL
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

Reply via email to