Re: Inconsistent Results with GroupIntoBatches PTransform

2019-08-09 Thread Jan Lukavský

Hi Rahul,

I cannot tell for sure. The fix was applied at runners-core, so - 
technically - it was possible that multiple runners were affected. A 
runner would be affected, if and only if, it would use something that 
depends on hashCode() of StateTag (or StateSpec) and user would use a 
Coder for that state that doesn't correctly implement hashCode() and 
equals() - SchemaCoder is one of such example.


After a few greps on the repository, I think that it might be possible, 
that Dataflow runner would be (more or less) affected by this as well 
(but someone from Dataflow team might confirm or disprove that better 
than me). Possibly affected code is at WindmillStateReader.java, which 
uses ConcurrentHashMap with StateTag as key. I'm not able to tell the 
consequences of that. I didn't find any obvious uses of HashMap or 
HashSet of StateTags in other runners. But that doesn't mean, that there 
really isn't any. :-)


Either way, by using version 2.14.0 you should be safe on all runners.

Jan

On 8/9/19 10:59 AM, rahul patwari wrote:

Hi Jan,

I was using Beam 2.13.0. I have upgraded Beam version to 2.14.0 and 
the results are always correct. No more inconsistencies.


Does BEAM-7269 affect all the runners?

Thanks,
Rahul

On Fri, Aug 9, 2019 at 2:15 PM Jan Lukavský <mailto:je...@seznam.cz>> wrote:


Hi Rahul,

what version of Beam are you using? There was a bug [1], which was
fixed in 2.14.0. This bug could cause what you observe.

Jan

[1] https://issues.apache.org/jira/browse/BEAM-7269

On 8/9/19 10:35 AM, rahul patwari wrote:

Hi Robert,

When PCollection is created using
"Create.of(listOfRow)*.withCoder(RowCoder.of(schema))*", I am
getting "Inconsistent" results.
By "Inconsistent", I mean that the result is "Incorrect"
sometimes(most of the times).
By "Incorrect" result, I mean that the elements are missing. The
elements are not duplicated. The elements are not batched
differently.

I have used System.identityHashcode(this) to convert
PCollection to PCollection> to apply
Stateful Pardo(GroupIntoBatches) as per your suggestion in this
thread

<https://lists.apache.org/thread.html/ed3344698db1bd107f2c2466f813e045056b62084806445fd54a61fc@%3Cdev.beam.apache.org%3E>

To verify the result, I have used GroupByKey, which should give
the same result as GroupIntoBatches *for my case*.

However, When PCollection is created using
"Create.of(listOfRow)", the results are always correct.

Regards,
Rahul

On Fri, Aug 9, 2019 at 1:05 PM Robert Bradshaw
mailto:rober...@google.com>> wrote:

Could you clarify what you mean by "inconsistent" and
"incorrect"? Are
elements missing/duplicated, or just batched differently?

On Fri, Aug 9, 2019 at 2:18 AM rahul patwari
mailto:rahulpatwari8...@gmail.com>> wrote:
>
> I only ran in Direct runner. I will run in other runners
and let you know the results.
> I am not setting "streaming" when executing.
>
> On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik, mailto:lc...@google.com>> wrote:
>>
>> Have you tried running this on more than one runner (e.g.
Dataflow, Flink, Direct)?
>>
>> Are you setting --streaming when executing?
>>
    >> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari
    mailto:rahulpatwari8...@gmail.com>> wrote:
>>>
>>> Hi,
>>>
>>> I am getting inconsistent results when using
GroupIntoBatches PTransform.
>>> I am using Create.of() PTransform to create a PCollection
from in-memory. When a coder is given with Create.of()
PTransform, I am facing the issue.
>>> If the coder is not provided, the results are consistent
and correct(Maybe this is just a coincidence and the problem
is at some other place).
>>> If Batch Size is 1, results are always consistent.
>>>
>>> Not sure if this is an issue with
Serialization/Deserialization (or) GroupIntoBatches (or)
Create.of() PTransform.
>>>
>>> The Java code, expected correct results, and inconsistent
results are available at
https://github.com/rahul8383/beam-examples
>>>
>>> Thanks,
>>> Rahul



Re: Inconsistent Results with GroupIntoBatches PTransform

2019-08-09 Thread rahul patwari
Hi Jan,

I was using Beam 2.13.0. I have upgraded Beam version to 2.14.0 and the
results are always correct. No more inconsistencies.

Does BEAM-7269 affect all the runners?

Thanks,
Rahul

On Fri, Aug 9, 2019 at 2:15 PM Jan Lukavský  wrote:

> Hi Rahul,
>
> what version of Beam are you using? There was a bug [1], which was fixed
> in 2.14.0. This bug could cause what you observe.
>
> Jan
>
> [1] https://issues.apache.org/jira/browse/BEAM-7269
> On 8/9/19 10:35 AM, rahul patwari wrote:
>
> Hi Robert,
>
> When PCollection is created using "Create.of(listOfRow)
> *.withCoder(RowCoder.of(schema))*", I am getting "Inconsistent" results.
> By "Inconsistent", I mean that the result is "Incorrect" sometimes(most of
> the times).
> By "Incorrect" result, I mean that the elements are missing. The elements
> are not duplicated. The elements are not batched differently.
>
> I have used System.identityHashcode(this) to convert PCollection to
> PCollection> to apply Stateful Pardo(GroupIntoBatches) as
> per your suggestion in this thread
> <https://lists.apache.org/thread.html/ed3344698db1bd107f2c2466f813e045056b62084806445fd54a61fc@%3Cdev.beam.apache.org%3E>
>
> To verify the result, I have used GroupByKey, which should give the
> same result as GroupIntoBatches *for my case*.
>
> However, When PCollection is created using "Create.of(listOfRow)", the
> results are always correct.
>
> Regards,
> Rahul
>
> On Fri, Aug 9, 2019 at 1:05 PM Robert Bradshaw 
> wrote:
>
>> Could you clarify what you mean by "inconsistent" and "incorrect"? Are
>> elements missing/duplicated, or just batched differently?
>>
>> On Fri, Aug 9, 2019 at 2:18 AM rahul patwari 
>> wrote:
>> >
>> > I only ran in Direct runner. I will run in other runners and let you
>> know the results.
>> > I am not setting "streaming" when executing.
>> >
>> > On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik,  wrote:
>> >>
>> >> Have you tried running this on more than one runner (e.g. Dataflow,
>> Flink, Direct)?
>> >>
>> >> Are you setting --streaming when executing?
>> >>
>> >> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari <
>> rahulpatwari8...@gmail.com> wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> I am getting inconsistent results when using GroupIntoBatches
>> PTransform.
>> >>> I am using Create.of() PTransform to create a PCollection from
>> in-memory. When a coder is given with Create.of() PTransform, I am facing
>> the issue.
>> >>> If the coder is not provided, the results are consistent and
>> correct(Maybe this is just a coincidence and the problem is at some other
>> place).
>> >>> If Batch Size is 1, results are always consistent.
>> >>>
>> >>> Not sure if this is an issue with Serialization/Deserialization (or)
>> GroupIntoBatches (or) Create.of() PTransform.
>> >>>
>> >>> The Java code, expected correct results, and inconsistent results are
>> available at https://github.com/rahul8383/beam-examples
>> >>>
>> >>> Thanks,
>> >>> Rahul
>>
>


Re: Inconsistent Results with GroupIntoBatches PTransform

2019-08-09 Thread Jan Lukavský

Hi Rahul,

what version of Beam are you using? There was a bug [1], which was fixed 
in 2.14.0. This bug could cause what you observe.


Jan

[1] https://issues.apache.org/jira/browse/BEAM-7269

On 8/9/19 10:35 AM, rahul patwari wrote:

Hi Robert,

When PCollection is created using 
"Create.of(listOfRow)*.withCoder(RowCoder.of(schema))*", I am getting 
"Inconsistent" results.
By "Inconsistent", I mean that the result is "Incorrect" 
sometimes(most of the times).
By "Incorrect" result, I mean that the elements are missing. The 
elements are not duplicated. The elements are not batched differently.


I have used System.identityHashcode(this) to convert PCollection 
to PCollection> to apply Stateful 
Pardo(GroupIntoBatches) as per your suggestion in this thread 
<https://lists.apache.org/thread.html/ed3344698db1bd107f2c2466f813e045056b62084806445fd54a61fc@%3Cdev.beam.apache.org%3E> 

To verify the result, I have used GroupByKey, which should give the 
same result as GroupIntoBatches *for my case*.


However, When PCollection is created using "Create.of(listOfRow)", the 
results are always correct.


Regards,
Rahul

On Fri, Aug 9, 2019 at 1:05 PM Robert Bradshaw <mailto:rober...@google.com>> wrote:


Could you clarify what you mean by "inconsistent" and "incorrect"? Are
elements missing/duplicated, or just batched differently?

On Fri, Aug 9, 2019 at 2:18 AM rahul patwari
mailto:rahulpatwari8...@gmail.com>>
wrote:
>
> I only ran in Direct runner. I will run in other runners and let
you know the results.
> I am not setting "streaming" when executing.
>
> On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik, mailto:lc...@google.com>> wrote:
>>
>> Have you tried running this on more than one runner (e.g.
Dataflow, Flink, Direct)?
>>
>> Are you setting --streaming when executing?
    >>
    >> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari
mailto:rahulpatwari8...@gmail.com>>
wrote:
>>>
>>> Hi,
>>>
>>> I am getting inconsistent results when using GroupIntoBatches
PTransform.
>>> I am using Create.of() PTransform to create a PCollection from
in-memory. When a coder is given with Create.of() PTransform, I am
facing the issue.
>>> If the coder is not provided, the results are consistent and
correct(Maybe this is just a coincidence and the problem is at
some other place).
>>> If Batch Size is 1, results are always consistent.
>>>
>>> Not sure if this is an issue with
Serialization/Deserialization (or) GroupIntoBatches (or)
Create.of() PTransform.
>>>
>>> The Java code, expected correct results, and inconsistent
results are available at https://github.com/rahul8383/beam-examples
>>>
>>> Thanks,
>>> Rahul



Re: Inconsistent Results with GroupIntoBatches PTransform

2019-08-09 Thread rahul patwari
Hi Robert,

When PCollection is created using "Create.of(listOfRow)
*.withCoder(RowCoder.of(schema))*", I am getting "Inconsistent" results.
By "Inconsistent", I mean that the result is "Incorrect" sometimes(most of
the times).
By "Incorrect" result, I mean that the elements are missing. The elements
are not duplicated. The elements are not batched differently.

I have used System.identityHashcode(this) to convert PCollection to
PCollection> to apply Stateful Pardo(GroupIntoBatches) as
per your suggestion in this thread
<https://lists.apache.org/thread.html/ed3344698db1bd107f2c2466f813e045056b62084806445fd54a61fc@%3Cdev.beam.apache.org%3E>

To verify the result, I have used GroupByKey, which should give the
same result as GroupIntoBatches *for my case*.

However, When PCollection is created using "Create.of(listOfRow)", the
results are always correct.

Regards,
Rahul

On Fri, Aug 9, 2019 at 1:05 PM Robert Bradshaw  wrote:

> Could you clarify what you mean by "inconsistent" and "incorrect"? Are
> elements missing/duplicated, or just batched differently?
>
> On Fri, Aug 9, 2019 at 2:18 AM rahul patwari 
> wrote:
> >
> > I only ran in Direct runner. I will run in other runners and let you
> know the results.
> > I am not setting "streaming" when executing.
> >
> > On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik,  wrote:
> >>
> >> Have you tried running this on more than one runner (e.g. Dataflow,
> Flink, Direct)?
> >>
> >> Are you setting --streaming when executing?
> >>
> >> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari <
> rahulpatwari8...@gmail.com> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I am getting inconsistent results when using GroupIntoBatches
> PTransform.
> >>> I am using Create.of() PTransform to create a PCollection from
> in-memory. When a coder is given with Create.of() PTransform, I am facing
> the issue.
> >>> If the coder is not provided, the results are consistent and
> correct(Maybe this is just a coincidence and the problem is at some other
> place).
> >>> If Batch Size is 1, results are always consistent.
> >>>
> >>> Not sure if this is an issue with Serialization/Deserialization (or)
> GroupIntoBatches (or) Create.of() PTransform.
> >>>
> >>> The Java code, expected correct results, and inconsistent results are
> available at https://github.com/rahul8383/beam-examples
> >>>
> >>> Thanks,
> >>> Rahul
>


Re: Inconsistent Results with GroupIntoBatches PTransform

2019-08-09 Thread Robert Bradshaw
Could you clarify what you mean by "inconsistent" and "incorrect"? Are
elements missing/duplicated, or just batched differently?

On Fri, Aug 9, 2019 at 2:18 AM rahul patwari  wrote:
>
> I only ran in Direct runner. I will run in other runners and let you know the 
> results.
> I am not setting "streaming" when executing.
>
> On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik,  wrote:
>>
>> Have you tried running this on more than one runner (e.g. Dataflow, Flink, 
>> Direct)?
>>
>> Are you setting --streaming when executing?
>>
>> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari  
>> wrote:
>>>
>>> Hi,
>>>
>>> I am getting inconsistent results when using GroupIntoBatches PTransform.
>>> I am using Create.of() PTransform to create a PCollection from in-memory. 
>>> When a coder is given with Create.of() PTransform, I am facing the issue.
>>> If the coder is not provided, the results are consistent and correct(Maybe 
>>> this is just a coincidence and the problem is at some other place).
>>> If Batch Size is 1, results are always consistent.
>>>
>>> Not sure if this is an issue with Serialization/Deserialization (or) 
>>> GroupIntoBatches (or) Create.of() PTransform.
>>>
>>> The Java code, expected correct results, and inconsistent results are 
>>> available at https://github.com/rahul8383/beam-examples
>>>
>>> Thanks,
>>> Rahul


Re: Inconsistent Results with GroupIntoBatches PTransform

2019-08-08 Thread rahul patwari
I only ran in Direct runner. I will run in other runners and let you know
the results.
I am not setting "streaming" when executing.

On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik,  wrote:

> Have you tried running this on more than one runner (e.g. Dataflow, Flink,
> Direct)?
>
> Are you setting --streaming when executing?
>
> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari 
> wrote:
>
>> Hi,
>>
>> I am getting inconsistent results when using GroupIntoBatches PTransform.
>> I am using Create.of() PTransform to create a PCollection from in-memory.
>> When a coder is given with Create.of() PTransform, I am facing the issue.
>> If the coder is not provided, the results are consistent and
>> correct(Maybe this is just a coincidence and the problem is at some other
>> place).
>> If Batch Size is 1, results are always consistent.
>>
>> Not sure if this is an issue with Serialization/Deserialization (or)
>> GroupIntoBatches (or) Create.of() PTransform.
>>
>> The Java code, expected correct results, and inconsistent results are
>> available at https://github.com/rahul8383/beam-examples
>>
>> Thanks,
>> Rahul
>>
>


Re: Inconsistent Results with GroupIntoBatches PTransform

2019-08-08 Thread Lukasz Cwik
Have you tried running this on more than one runner (e.g. Dataflow, Flink,
Direct)?

Are you setting --streaming when executing?

On Thu, Aug 8, 2019 at 10:23 AM rahul patwari 
wrote:

> Hi,
>
> I am getting inconsistent results when using GroupIntoBatches PTransform.
> I am using Create.of() PTransform to create a PCollection from in-memory.
> When a coder is given with Create.of() PTransform, I am facing the issue.
> If the coder is not provided, the results are consistent and correct(Maybe
> this is just a coincidence and the problem is at some other place).
> If Batch Size is 1, results are always consistent.
>
> Not sure if this is an issue with Serialization/Deserialization (or)
> GroupIntoBatches (or) Create.of() PTransform.
>
> The Java code, expected correct results, and inconsistent results are
> available at https://github.com/rahul8383/beam-examples
>
> Thanks,
> Rahul
>


Inconsistent Results with GroupIntoBatches PTransform

2019-08-08 Thread rahul patwari
Hi,

I am getting inconsistent results when using GroupIntoBatches PTransform.
I am using Create.of() PTransform to create a PCollection from in-memory.
When a coder is given with Create.of() PTransform, I am facing the issue.
If the coder is not provided, the results are consistent and correct(Maybe
this is just a coincidence and the problem is at some other place).
If Batch Size is 1, results are always consistent.

Not sure if this is an issue with Serialization/Deserialization (or)
GroupIntoBatches (or) Create.of() PTransform.

The Java code, expected correct results, and inconsistent results are
available at https://github.com/rahul8383/beam-examples

Thanks,
Rahul