To diagnose that, can you please check the following:

  - Change the Person data type to be immutable (final fields, no setters,
set fields in constructor instead). Does that make the problem go away?

  - Change the Person data type to not be a POJO by adding a dummy fields
that is never used, but does not have a getter/setter. Does that make the
problem go away?

If either of that is the case, it must be a mutability bug somewhere in
either accidental object reuse or accidental serializer sharing.


On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Simone and Flavio,
>
> I created FLINK-9031 [1] for this issue.
> Please have a look and add any detail that you think could help to resolve
> the problem.
>
> Thanks,
> Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-9031
>
> 2018-03-19 16:35 GMT+01:00 simone <simone.povosca...@gmail.com>:
>
>> Hi Fabian,
>>
>> This simple code reproduces the behavior ->
>> https://github.com/xseris/Flink-test-union
>>
>> Thanks, Simone.
>>
>> On 19/03/2018 15:44, Fabian Hueske wrote:
>>
>> Hmmm, I still don't see the problem.
>> IMO, the result should be correct for both plans. The data is replicated,
>> filtered, reduced, and unioned.
>> There is nothing in between the filter and reduce, that could cause
>> incorrect behavior.
>>
>> The good thing is, the optimizer seems to be fine. The bad thing is, it
>> is either the Flink runtime code or your functions.
>> Given that one plan produces good results, it might be the Flink runtime
>> code.
>>
>> Coming back to my previous question.
>> Can you provide a minimal program to reproduce the issue?
>>
>> Thanks, Fabian
>>
>> 2018-03-19 15:15 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:
>>
>>> Ah, thanks for the update!
>>> I'll have a look at that.
>>>
>>> 2018-03-19 15:13 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:
>>>
>>>> HI Simone,
>>>>
>>>> Looking at the plan, I don't see why this should be happening. The
>>>> pseudo code looks fine as well.
>>>> Any chance that you can create a minimal program to reproduce the
>>>> problem?
>>>>
>>>> Thanks,
>>>> Fabian
>>>>
>>>> 2018-03-19 12:04 GMT+01:00 simone <simone.povosca...@gmail.com>:
>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> reuse is not enabled. I attach the plan of the execution.
>>>>>
>>>>> Thanks,
>>>>> Simone
>>>>>
>>>>> On 19/03/2018 11:36, Fabian Hueske wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> Union is actually a very simple operator (not even an operator in
>>>>> Flink terms). It just merges to inputs. There is no additional logic
>>>>> involved.
>>>>> Therefore, it should also not emit records before either of both
>>>>> ReduceFunctions sorted its data.
>>>>> Once the data has been sorted for the ReduceFunction, the data is
>>>>> reduced and emitted in a pipelined fashion, i.e., once the first record is
>>>>> reduced, it is forwarded into the MapFunction (passing the unioned 
>>>>> inputs).
>>>>> So it is not unexpected that Map starts processing before the
>>>>> ReduceFunction terminated.
>>>>>
>>>>> Did you enable object reuse [1]?
>>>>> If yes, try to disable it. If you want to reuse objects, you have to
>>>>> be careful in how you implement your functions.
>>>>> If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
>>>>> that was generated for the program?
>>>>>
>>>>> Thanks,
>>>>> Fabian
>>>>>
>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>>> dev/batch/index.html#operating-on-data-objects-in-functions
>>>>>
>>>>>
>>>>>
>>>>> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>>>>
>>>>>> Any help on this? This thing is very strange..the "manual" union of
>>>>>> the output of the 2 datasets is different than the flink-union of them..
>>>>>> Could it be a problem of the flink optimizer?
>>>>>>
>>>>>> Best,
>>>>>> Flavio
>>>>>>
>>>>>> On Fri, Mar 16, 2018 at 4:01 PM, simone <simone.povosca...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Sorry, I translated the code into pseudocode too fast. That is
>>>>>>> indeed an equals.
>>>>>>>
>>>>>>> On 16/03/2018 15:58, Kien Truong wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Just a guest, but string compare in Java should be using equals
>>>>>>> method, not == operator.
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Kien
>>>>>>>
>>>>>>>
>>>>>>> On 3/16/2018 9:47 PM, simone wrote:
>>>>>>>
>>>>>>> *subject.getField("field1") == "";*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>>
>

Reply via email to