To diagnose that, can you please check the following: - Change the Person data type to be immutable (final fields, no setters, set fields in constructor instead). Does that make the problem go away?
- Change the Person data type to not be a POJO by adding a dummy fields that is never used, but does not have a getter/setter. Does that make the problem go away? If either of that is the case, it must be a mutability bug somewhere in either accidental object reuse or accidental serializer sharing. On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Simone and Flavio, > > I created FLINK-9031 [1] for this issue. > Please have a look and add any detail that you think could help to resolve > the problem. > > Thanks, > Fabian > > [1] https://issues.apache.org/jira/browse/FLINK-9031 > > 2018-03-19 16:35 GMT+01:00 simone <simone.povosca...@gmail.com>: > >> Hi Fabian, >> >> This simple code reproduces the behavior -> >> https://github.com/xseris/Flink-test-union >> >> Thanks, Simone. >> >> On 19/03/2018 15:44, Fabian Hueske wrote: >> >> Hmmm, I still don't see the problem. >> IMO, the result should be correct for both plans. The data is replicated, >> filtered, reduced, and unioned. >> There is nothing in between the filter and reduce, that could cause >> incorrect behavior. >> >> The good thing is, the optimizer seems to be fine. The bad thing is, it >> is either the Flink runtime code or your functions. >> Given that one plan produces good results, it might be the Flink runtime >> code. >> >> Coming back to my previous question. >> Can you provide a minimal program to reproduce the issue? >> >> Thanks, Fabian >> >> 2018-03-19 15:15 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: >> >>> Ah, thanks for the update! >>> I'll have a look at that. >>> >>> 2018-03-19 15:13 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: >>> >>>> HI Simone, >>>> >>>> Looking at the plan, I don't see why this should be happening. The >>>> pseudo code looks fine as well. >>>> Any chance that you can create a minimal program to reproduce the >>>> problem? >>>> >>>> Thanks, >>>> Fabian >>>> >>>> 2018-03-19 12:04 GMT+01:00 simone <simone.povosca...@gmail.com>: >>>> >>>>> Hi Fabian, >>>>> >>>>> reuse is not enabled. I attach the plan of the execution. >>>>> >>>>> Thanks, >>>>> Simone >>>>> >>>>> On 19/03/2018 11:36, Fabian Hueske wrote: >>>>> >>>>> Hi, >>>>> >>>>> Union is actually a very simple operator (not even an operator in >>>>> Flink terms). It just merges to inputs. There is no additional logic >>>>> involved. >>>>> Therefore, it should also not emit records before either of both >>>>> ReduceFunctions sorted its data. >>>>> Once the data has been sorted for the ReduceFunction, the data is >>>>> reduced and emitted in a pipelined fashion, i.e., once the first record is >>>>> reduced, it is forwarded into the MapFunction (passing the unioned >>>>> inputs). >>>>> So it is not unexpected that Map starts processing before the >>>>> ReduceFunction terminated. >>>>> >>>>> Did you enable object reuse [1]? >>>>> If yes, try to disable it. If you want to reuse objects, you have to >>>>> be careful in how you implement your functions. >>>>> If no, can you share the plan (ExecutionEnvironment.getExecutionPlan()) >>>>> that was generated for the program? >>>>> >>>>> Thanks, >>>>> Fabian >>>>> >>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>>>> dev/batch/index.html#operating-on-data-objects-in-functions >>>>> >>>>> >>>>> >>>>> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <pomperma...@okkam.it>: >>>>> >>>>>> Any help on this? This thing is very strange..the "manual" union of >>>>>> the output of the 2 datasets is different than the flink-union of them.. >>>>>> Could it be a problem of the flink optimizer? >>>>>> >>>>>> Best, >>>>>> Flavio >>>>>> >>>>>> On Fri, Mar 16, 2018 at 4:01 PM, simone <simone.povosca...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Sorry, I translated the code into pseudocode too fast. That is >>>>>>> indeed an equals. >>>>>>> >>>>>>> On 16/03/2018 15:58, Kien Truong wrote: >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Just a guest, but string compare in Java should be using equals >>>>>>> method, not == operator. >>>>>>> >>>>>>> Regards, >>>>>>> >>>>>>> Kien >>>>>>> >>>>>>> >>>>>>> On 3/16/2018 9:47 PM, simone wrote: >>>>>>> >>>>>>> *subject.getField("field1") == "";* >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>> >>> >> >> >