Hi, All

Happy new year!

I am using TridentKafkaSpout to collect data from kafka, and having a
trident state as well, here is the code


static class EventUpdater implements ReducerAggregator<List<String>> {

            @Override
            public List<String> init(){
                     return null;
            }
            @Override
            public List<String> reduce(List<String> curr, TridentTuple
tuple) {
                   List<String> updated = null ;

                   if ( curr == null ) {
                                    String event = (String)
tuple.getValue(1);
                                    updated = Lists.newArrayList(event);
                   } else {
                                    updated = curr ;
                   }
              return updated ;
            }
        }


BrokerHosts zk = new ZkHosts(CrunchifyGetPropertyValues().get(2));
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk,
CrunchifyGetPropertyValues().get(3));
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
spoutConf.forceFromStart = true;
OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(spoutConf);
TridentTopology topology = new TridentTopology();

final DBStateConfig config = new DBStateConfig();
{
                      config.setUrl(dburl);
                      config.setTable("test.state");
                      config.setKeyColumns(new String[]{"userid"});
                      config.setValueColumns(new String[]{"event"});
                      config.setType(StateType.OPAQUE);
                      config.setCacheSize(500);
}

 topology.newStream("topictestspout", kafkaSpout)
                              .each(new Fields("str"),
                                       new JsonObjectParse(),
                                       new Fields("userid","event"))
                              .parallelismHint(6)
                              .groupBy(new Fields("userid"))

.persistentAggregate(DBState.newFactory(config), new
Fields("userid","event"), new EventUpdater(), new Fields( "eventword"));


In state which is implemented by IBackingMap<T>, I print the keys in
multiPut(final List<List<Object>> keys, final List<T> values), I found it
only get one
tuple out of a batch of tuples, see

multiPut start .....
keys = [[0]]
values =
[[{"time":1419441764,"machine":"SALI","user":0,"eventType":"PageView","method":"GET","host":"localhost","page":"/mysite/mypath/v5","querystring":{"param1":"value1","param2":"value2"},"accepttypes":["application/json"],"loadtime":1001.0,"rendertime":200.0,"totaltime":1138.0}]]

multiPut start .....
keys = [[0]]
values =
[[{"time":1419441764,"machine":"SALI","user":0,"eventType":"PageView","method":"GET","host":"localhost","page":"/mysite/mypath/v5","querystring":{"param1":"value1","param2":"value2"},"accepttypes":["application/json"],"loadtime":1001.0,"rendertime":200.0,"totaltime":1138.0}]]

multiPut start .....
keys = [[0]]
values =
[[{"time":1419441764,"machine":"SALI","user":0,"eventType":"PageView","method":"GET","host":"localhost","page":"/mysite/mypath/v5","querystring":{"param1":"value1","param2":"value2"},"accepttypes":["application/json"],"loadtime":1001.0,"rendertime":200.0,"totaltime":1138.0}]]


I am attempting to know what is the problem is, I replace kafkaSpout by a
RandomTupleSpout

 static class RandomTupleSpout implements IBatchSpout {
            private transient Random random;
            private static final int BATCH = 40;
            @Override
            @SuppressWarnings("rawtypes")
            public void open(final Map conf, final TopologyContext context)
{
                          random = new Random();
            }
            @Override
            public void emitBatch(final long batchId, final
TridentCollector collector) {
                  for (int i = 0; i < BATCH; i++) {
                          collector.emit(new Values(i, "test string
inserted into this table "+i));
                  }
            }
            @Override
            public void ack(final long batchId) {}
            @Override
            public void close() {}
            @Override
            @SuppressWarnings("rawtypes")
            public Map getComponentConfiguration() {
                   return null;
            }
            @Override
            public Fields getOutputFields() {
                     return new Fields("userid", "event");
              //       return new Fields("event");
            }
        }

Now I am able to get it in batch, like
multiPut start .....
[[37], [38], [39], [33], [34], [35], [36], [0], [3], [4], [1], [2], [7],
[8], [5], [6], [11], [12], [9], [10], [15], [16], [13], [14], [20], [19],
[18], [17], [24], [23], [22], [21], [28], [27], [26], [25], [32], [31],
[30], [29]]
keys = [[37], [38], [39], [33], [34], [35], [36], [0], [3], [4], [1], [2],
[7], [8], [5], [6], [11], [12], [9], [10], [15], [16], [13], [14], [20],
[19], [18], [17], [24], [23], [22], [21], [28], [27], [26], [25], [32],
[31], [30], [29]]
values = [[test string inserted into this table 37], [test string inserted
into this table 38], [test string inserted into this table 39], [test
string inserted into this table 33], [test string inserted into this table
34], [test string inserted into this table 35], [test string inserted into
this table 36], [test string inserted into this table 0], [test string
inserted into this table 3], [test string inserted into this table 4],
[test string inserted into this table 1], [test string inserted into this
table 2], [test string inserted into this table 7], [test string inserted
into this table 8], [test string inserted into this table 5], [test string
inserted into this table 6], [test string inserted into this table 11],
[test string inserted into this table 12], [test string inserted into this
table 9], [test string inserted into this table 10], [test string inserted
into this table 15], [test string inserted into this table 16], [test
string inserted into this table 13], [test string inserted into this table
14], [test string inserted into this table 20], [test string inserted into
this table 19], [test string inserted into this table 18], [test string
inserted into this table 17], [test string inserted into this table 24],
[test string inserted into this table 23], [test string inserted into this
table 22], [test string inserted into this table 21], [test string inserted
into this table 28], [test string inserted into this table 27], [test
string inserted into this table 26], [test string inserted into this table
25], [test string inserted into this table 32], [test string inserted into
this table 31], [test string inserted into this table 30], [test string
inserted into this table 29]]

This makes me realize that KafkaSpout doesn't fetch data in batch ( while
RandomTupleSpout implements IBatchSpout), how can I fix this problem? In
addition, I feel my EventUpdater which implements ReducerAggregator might
also cause the fields problem.


Any help?


thanks a lot


AL

Reply via email to