Hi, All
Happy new year!
I am using TridentKafkaSpout to collect data from kafka, and having a
trident state as well, here is the code
static class EventUpdater implements ReducerAggregator<List<String>> {
@Override
public List<String> init(){
return null;
}
@Override
public List<String> reduce(List<String> curr, TridentTuple
tuple) {
List<String> updated = null ;
if ( curr == null ) {
String event = (String)
tuple.getValue(1);
updated = Lists.newArrayList(event);
} else {
updated = curr ;
}
return updated ;
}
}
BrokerHosts zk = new ZkHosts(CrunchifyGetPropertyValues().get(2));
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk,
CrunchifyGetPropertyValues().get(3));
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
spoutConf.forceFromStart = true;
OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(spoutConf);
TridentTopology topology = new TridentTopology();
final DBStateConfig config = new DBStateConfig();
{
config.setUrl(dburl);
config.setTable("test.state");
config.setKeyColumns(new String[]{"userid"});
config.setValueColumns(new String[]{"event"});
config.setType(StateType.OPAQUE);
config.setCacheSize(500);
}
topology.newStream("topictestspout", kafkaSpout)
.each(new Fields("str"),
new JsonObjectParse(),
new Fields("userid","event"))
.parallelismHint(6)
.groupBy(new Fields("userid"))
.persistentAggregate(DBState.newFactory(config), new
Fields("userid","event"), new EventUpdater(), new Fields( "eventword"));
In state which is implemented by IBackingMap<T>, I print the keys in
multiPut(final List<List<Object>> keys, final List<T> values), I found it
only get one
tuple out of a batch of tuples, see
multiPut start .....
keys = [[0]]
values =
[[{"time":1419441764,"machine":"SALI","user":0,"eventType":"PageView","method":"GET","host":"localhost","page":"/mysite/mypath/v5","querystring":{"param1":"value1","param2":"value2"},"accepttypes":["application/json"],"loadtime":1001.0,"rendertime":200.0,"totaltime":1138.0}]]
multiPut start .....
keys = [[0]]
values =
[[{"time":1419441764,"machine":"SALI","user":0,"eventType":"PageView","method":"GET","host":"localhost","page":"/mysite/mypath/v5","querystring":{"param1":"value1","param2":"value2"},"accepttypes":["application/json"],"loadtime":1001.0,"rendertime":200.0,"totaltime":1138.0}]]
multiPut start .....
keys = [[0]]
values =
[[{"time":1419441764,"machine":"SALI","user":0,"eventType":"PageView","method":"GET","host":"localhost","page":"/mysite/mypath/v5","querystring":{"param1":"value1","param2":"value2"},"accepttypes":["application/json"],"loadtime":1001.0,"rendertime":200.0,"totaltime":1138.0}]]
I am attempting to know what is the problem is, I replace kafkaSpout by a
RandomTupleSpout
static class RandomTupleSpout implements IBatchSpout {
private transient Random random;
private static final int BATCH = 40;
@Override
@SuppressWarnings("rawtypes")
public void open(final Map conf, final TopologyContext context)
{
random = new Random();
}
@Override
public void emitBatch(final long batchId, final
TridentCollector collector) {
for (int i = 0; i < BATCH; i++) {
collector.emit(new Values(i, "test string
inserted into this table "+i));
}
}
@Override
public void ack(final long batchId) {}
@Override
public void close() {}
@Override
@SuppressWarnings("rawtypes")
public Map getComponentConfiguration() {
return null;
}
@Override
public Fields getOutputFields() {
return new Fields("userid", "event");
// return new Fields("event");
}
}
Now I am able to get it in batch, like
multiPut start .....
[[37], [38], [39], [33], [34], [35], [36], [0], [3], [4], [1], [2], [7],
[8], [5], [6], [11], [12], [9], [10], [15], [16], [13], [14], [20], [19],
[18], [17], [24], [23], [22], [21], [28], [27], [26], [25], [32], [31],
[30], [29]]
keys = [[37], [38], [39], [33], [34], [35], [36], [0], [3], [4], [1], [2],
[7], [8], [5], [6], [11], [12], [9], [10], [15], [16], [13], [14], [20],
[19], [18], [17], [24], [23], [22], [21], [28], [27], [26], [25], [32],
[31], [30], [29]]
values = [[test string inserted into this table 37], [test string inserted
into this table 38], [test string inserted into this table 39], [test
string inserted into this table 33], [test string inserted into this table
34], [test string inserted into this table 35], [test string inserted into
this table 36], [test string inserted into this table 0], [test string
inserted into this table 3], [test string inserted into this table 4],
[test string inserted into this table 1], [test string inserted into this
table 2], [test string inserted into this table 7], [test string inserted
into this table 8], [test string inserted into this table 5], [test string
inserted into this table 6], [test string inserted into this table 11],
[test string inserted into this table 12], [test string inserted into this
table 9], [test string inserted into this table 10], [test string inserted
into this table 15], [test string inserted into this table 16], [test
string inserted into this table 13], [test string inserted into this table
14], [test string inserted into this table 20], [test string inserted into
this table 19], [test string inserted into this table 18], [test string
inserted into this table 17], [test string inserted into this table 24],
[test string inserted into this table 23], [test string inserted into this
table 22], [test string inserted into this table 21], [test string inserted
into this table 28], [test string inserted into this table 27], [test
string inserted into this table 26], [test string inserted into this table
25], [test string inserted into this table 32], [test string inserted into
this table 31], [test string inserted into this table 30], [test string
inserted into this table 29]]
This makes me realize that KafkaSpout doesn't fetch data in batch ( while
RandomTupleSpout implements IBatchSpout), how can I fix this problem? In
addition, I feel my EventUpdater which implements ReducerAggregator might
also cause the fields problem.
Any help?
thanks a lot
AL