Hi all,
In Trident, what is the best way to keep only distinct tuples based on
specified fields ? This operation must be apply per batch.
I end up implementing an Aggregator as follows :
public class Distinct extends BaseAggregator<Map<Object, List<Object>>>
{
private Fields fields;
public Distinct(Fields fields) {
this.fields = fields;
}
@Override
public Map<Object, List<Object>> init(Object batchId,
TridentCollector collector) {
return new HashMap<>();
}
@Override
public void aggregate(Map<Object, List<Object>> state, TridentTuple
tuple, TridentCollector collector) {
List<Object> values = tuple.getValues();
List<Object> key = new ArrayList<>(fields.size());
key.addAll(fields.toList().stream().map(tuple::getValueByField).collect(Collectors.toList()));
if( ! state.containsKey(key) ) {
state.put(key, values);
}
}
@Override
public void complete(Map<Object, List<Object>> state,
TridentCollector collector) {
state.values().forEach(collector::emit);
}
}
However,that implementation seem to be cumbersome because we have to
declare all the input/output fields :
stream.partitionBy(new Fields("type") )
.partitionAggregate(new Fields("sensor-id", "type"", "ts"), new
Distinct(new new Fields("type"), new Fields("sensor-id", "type"", "ts")
Another solution could be to use a Filter but is there a way to get the
batch ID ?
A nice feature would be to have this operation directly on the stream
classe :
stream.distinct(new Fields("type")) and stream.partitionDistinct(new
Fields("type")).
Thank you in advance.
--
Florian HUSSONNOIS