Hi all,

In Trident, what is the best way to keep only distinct tuples based on
specified fields ? This operation must be apply per batch.

I end up implementing an Aggregator as follows :

    public class Distinct extends BaseAggregator<Map<Object, List<Object>>>
{
        private Fields fields;

        public Distinct(Fields fields) {
            this.fields = fields;
        }

        @Override
        public Map<Object, List<Object>> init(Object batchId,
TridentCollector collector) {
            return new HashMap<>();
        }

        @Override
        public void aggregate(Map<Object, List<Object>> state, TridentTuple
tuple, TridentCollector collector) {
            List<Object> values = tuple.getValues();
            List<Object> key = new ArrayList<>(fields.size());

key.addAll(fields.toList().stream().map(tuple::getValueByField).collect(Collectors.toList()));
            if( ! state.containsKey(key) ) {
                state.put(key, values);
            }
        }

        @Override
        public void complete(Map<Object, List<Object>> state,
TridentCollector collector) {
            state.values().forEach(collector::emit);
        }
    }


However,that implementation seem to be cumbersome because we have to
declare all the input/output fields :

stream.partitionBy(new Fields("type") )
           .partitionAggregate(new Fields("sensor-id", "type"", "ts"), new
Distinct(new new Fields("type"), new Fields("sensor-id", "type"", "ts")

Another solution could be to use a Filter but is there a way to get the
batch ID ?

A nice feature would be to have this operation directly on the stream
classe :

stream.distinct(new Fields("type")) and stream.partitionDistinct(new
Fields("type")).

Thank you in advance.

-- 
Florian HUSSONNOIS

Reply via email to