Hi Will,

The distinct operator is implemented as a groupBy(distinctKeys) and a
ReduceFunction that returns the first argument.
Hence, it depends on the order in which the records are processed by the
ReduceFunction.

Flink does not maintain a deterministic order because it is quite expensive
in distributed systems.
There are a few aspects that result in random order:
- lazy split assignment
- combiners (which are automatically added for ReduceFunctions)
- network shuffles

There are two ways to address this issue:
1) Fully sort the input of the combiners and reducers on all attributes.
2) Use a custom ReduceFunction that compares both input records on all
(non-distinct-key) fields to determine which record to return.

I would go for the second approach because it is more efficient (no need to
fully sort before the combiner).

Best, Fabian

2018-08-09 18:12 GMT+02:00 Will Bastian <will.s.bast...@gmail.com>:

> I'm operating on a data set with some challenges to overcome. They are:
>
>    1. There is possibility for multiple entries for a single key
>    and
>    2. For a single key, there may be multiple unique value-tuples
>
> For example
> key, val1, val2, val3
> 1,      0,    0,    0
> 1,      0,    0,    0
> 1,      1,    0,    0
> 2,      1,    1,    1
> 2,      1,    1,    1
> 2,      1,    1,    0
> 1,      0,    0,    0
>
> I've found when executing mySet.distinct(_.key) on the above, that my
> final results suggest distinct isn't always pulling the same
> record/value-tuple on every run.
>
> Fully understanding that the use of distinct I've outlined above isn't
> optimal (we don't know, or care which value-tuple we get, we just want it
> to be consistent on each run), I wanted to validate whether what I believe
> I'm observing is accurate. Specifically, in this example is Flink reducing
> by key with no concern for value, and we can expect the possibility that we
> may pull different instances back on each distinct call?
>
> Thanks,
> Will
>

Reply via email to