Re: Inconsistent Results with GroupIntoBatches PTransform
Hi Rahul, I cannot tell for sure. The fix was applied at runners-core, so - technically - it was possible that multiple runners were affected. A runner would be affected, if and only if, it would use something that depends on hashCode() of StateTag (or StateSpec) and user would use a Coder for that state that doesn't correctly implement hashCode() and equals() - SchemaCoder is one of such example. After a few greps on the repository, I think that it might be possible, that Dataflow runner would be (more or less) affected by this as well (but someone from Dataflow team might confirm or disprove that better than me). Possibly affected code is at WindmillStateReader.java, which uses ConcurrentHashMap with StateTag as key. I'm not able to tell the consequences of that. I didn't find any obvious uses of HashMap or HashSet of StateTags in other runners. But that doesn't mean, that there really isn't any. :-) Either way, by using version 2.14.0 you should be safe on all runners. Jan On 8/9/19 10:59 AM, rahul patwari wrote: Hi Jan, I was using Beam 2.13.0. I have upgraded Beam version to 2.14.0 and the results are always correct. No more inconsistencies. Does BEAM-7269 affect all the runners? Thanks, Rahul On Fri, Aug 9, 2019 at 2:15 PM Jan Lukavský <mailto:je...@seznam.cz>> wrote: Hi Rahul, what version of Beam are you using? There was a bug [1], which was fixed in 2.14.0. This bug could cause what you observe. Jan [1] https://issues.apache.org/jira/browse/BEAM-7269 On 8/9/19 10:35 AM, rahul patwari wrote: Hi Robert, When PCollection is created using "Create.of(listOfRow)*.withCoder(RowCoder.of(schema))*", I am getting "Inconsistent" results. By "Inconsistent", I mean that the result is "Incorrect" sometimes(most of the times). By "Incorrect" result, I mean that the elements are missing. The elements are not duplicated. The elements are not batched differently. I have used System.identityHashcode(this) to convert PCollection to PCollection> to apply Stateful Pardo(GroupIntoBatches) as per your suggestion in this thread <https://lists.apache.org/thread.html/ed3344698db1bd107f2c2466f813e045056b62084806445fd54a61fc@%3Cdev.beam.apache.org%3E> To verify the result, I have used GroupByKey, which should give the same result as GroupIntoBatches *for my case*. However, When PCollection is created using "Create.of(listOfRow)", the results are always correct. Regards, Rahul On Fri, Aug 9, 2019 at 1:05 PM Robert Bradshaw mailto:rober...@google.com>> wrote: Could you clarify what you mean by "inconsistent" and "incorrect"? Are elements missing/duplicated, or just batched differently? On Fri, Aug 9, 2019 at 2:18 AM rahul patwari mailto:rahulpatwari8...@gmail.com>> wrote: > > I only ran in Direct runner. I will run in other runners and let you know the results. > I am not setting "streaming" when executing. > > On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik, mailto:lc...@google.com>> wrote: >> >> Have you tried running this on more than one runner (e.g. Dataflow, Flink, Direct)? >> >> Are you setting --streaming when executing? >> >> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari mailto:rahulpatwari8...@gmail.com>> wrote: >>> >>> Hi, >>> >>> I am getting inconsistent results when using GroupIntoBatches PTransform. >>> I am using Create.of() PTransform to create a PCollection from in-memory. When a coder is given with Create.of() PTransform, I am facing the issue. >>> If the coder is not provided, the results are consistent and correct(Maybe this is just a coincidence and the problem is at some other place). >>> If Batch Size is 1, results are always consistent. >>> >>> Not sure if this is an issue with Serialization/Deserialization (or) GroupIntoBatches (or) Create.of() PTransform. >>> >>> The Java code, expected correct results, and inconsistent results are available at https://github.com/rahul8383/beam-examples >>> >>> Thanks, >>> Rahul
Re: Inconsistent Results with GroupIntoBatches PTransform
Hi Jan, I was using Beam 2.13.0. I have upgraded Beam version to 2.14.0 and the results are always correct. No more inconsistencies. Does BEAM-7269 affect all the runners? Thanks, Rahul On Fri, Aug 9, 2019 at 2:15 PM Jan Lukavský wrote: > Hi Rahul, > > what version of Beam are you using? There was a bug [1], which was fixed > in 2.14.0. This bug could cause what you observe. > > Jan > > [1] https://issues.apache.org/jira/browse/BEAM-7269 > On 8/9/19 10:35 AM, rahul patwari wrote: > > Hi Robert, > > When PCollection is created using "Create.of(listOfRow) > *.withCoder(RowCoder.of(schema))*", I am getting "Inconsistent" results. > By "Inconsistent", I mean that the result is "Incorrect" sometimes(most of > the times). > By "Incorrect" result, I mean that the elements are missing. The elements > are not duplicated. The elements are not batched differently. > > I have used System.identityHashcode(this) to convert PCollection to > PCollection> to apply Stateful Pardo(GroupIntoBatches) as > per your suggestion in this thread > <https://lists.apache.org/thread.html/ed3344698db1bd107f2c2466f813e045056b62084806445fd54a61fc@%3Cdev.beam.apache.org%3E> > > To verify the result, I have used GroupByKey, which should give the > same result as GroupIntoBatches *for my case*. > > However, When PCollection is created using "Create.of(listOfRow)", the > results are always correct. > > Regards, > Rahul > > On Fri, Aug 9, 2019 at 1:05 PM Robert Bradshaw > wrote: > >> Could you clarify what you mean by "inconsistent" and "incorrect"? Are >> elements missing/duplicated, or just batched differently? >> >> On Fri, Aug 9, 2019 at 2:18 AM rahul patwari >> wrote: >> > >> > I only ran in Direct runner. I will run in other runners and let you >> know the results. >> > I am not setting "streaming" when executing. >> > >> > On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik, wrote: >> >> >> >> Have you tried running this on more than one runner (e.g. Dataflow, >> Flink, Direct)? >> >> >> >> Are you setting --streaming when executing? >> >> >> >> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari < >> rahulpatwari8...@gmail.com> wrote: >> >>> >> >>> Hi, >> >>> >> >>> I am getting inconsistent results when using GroupIntoBatches >> PTransform. >> >>> I am using Create.of() PTransform to create a PCollection from >> in-memory. When a coder is given with Create.of() PTransform, I am facing >> the issue. >> >>> If the coder is not provided, the results are consistent and >> correct(Maybe this is just a coincidence and the problem is at some other >> place). >> >>> If Batch Size is 1, results are always consistent. >> >>> >> >>> Not sure if this is an issue with Serialization/Deserialization (or) >> GroupIntoBatches (or) Create.of() PTransform. >> >>> >> >>> The Java code, expected correct results, and inconsistent results are >> available at https://github.com/rahul8383/beam-examples >> >>> >> >>> Thanks, >> >>> Rahul >> >
Re: Inconsistent Results with GroupIntoBatches PTransform
Hi Rahul, what version of Beam are you using? There was a bug [1], which was fixed in 2.14.0. This bug could cause what you observe. Jan [1] https://issues.apache.org/jira/browse/BEAM-7269 On 8/9/19 10:35 AM, rahul patwari wrote: Hi Robert, When PCollection is created using "Create.of(listOfRow)*.withCoder(RowCoder.of(schema))*", I am getting "Inconsistent" results. By "Inconsistent", I mean that the result is "Incorrect" sometimes(most of the times). By "Incorrect" result, I mean that the elements are missing. The elements are not duplicated. The elements are not batched differently. I have used System.identityHashcode(this) to convert PCollection to PCollection> to apply Stateful Pardo(GroupIntoBatches) as per your suggestion in this thread <https://lists.apache.org/thread.html/ed3344698db1bd107f2c2466f813e045056b62084806445fd54a61fc@%3Cdev.beam.apache.org%3E> To verify the result, I have used GroupByKey, which should give the same result as GroupIntoBatches *for my case*. However, When PCollection is created using "Create.of(listOfRow)", the results are always correct. Regards, Rahul On Fri, Aug 9, 2019 at 1:05 PM Robert Bradshaw <mailto:rober...@google.com>> wrote: Could you clarify what you mean by "inconsistent" and "incorrect"? Are elements missing/duplicated, or just batched differently? On Fri, Aug 9, 2019 at 2:18 AM rahul patwari mailto:rahulpatwari8...@gmail.com>> wrote: > > I only ran in Direct runner. I will run in other runners and let you know the results. > I am not setting "streaming" when executing. > > On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik, mailto:lc...@google.com>> wrote: >> >> Have you tried running this on more than one runner (e.g. Dataflow, Flink, Direct)? >> >> Are you setting --streaming when executing? >> >> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari mailto:rahulpatwari8...@gmail.com>> wrote: >>> >>> Hi, >>> >>> I am getting inconsistent results when using GroupIntoBatches PTransform. >>> I am using Create.of() PTransform to create a PCollection from in-memory. When a coder is given with Create.of() PTransform, I am facing the issue. >>> If the coder is not provided, the results are consistent and correct(Maybe this is just a coincidence and the problem is at some other place). >>> If Batch Size is 1, results are always consistent. >>> >>> Not sure if this is an issue with Serialization/Deserialization (or) GroupIntoBatches (or) Create.of() PTransform. >>> >>> The Java code, expected correct results, and inconsistent results are available at https://github.com/rahul8383/beam-examples >>> >>> Thanks, >>> Rahul
Re: Inconsistent Results with GroupIntoBatches PTransform
Hi Robert, When PCollection is created using "Create.of(listOfRow) *.withCoder(RowCoder.of(schema))*", I am getting "Inconsistent" results. By "Inconsistent", I mean that the result is "Incorrect" sometimes(most of the times). By "Incorrect" result, I mean that the elements are missing. The elements are not duplicated. The elements are not batched differently. I have used System.identityHashcode(this) to convert PCollection to PCollection> to apply Stateful Pardo(GroupIntoBatches) as per your suggestion in this thread <https://lists.apache.org/thread.html/ed3344698db1bd107f2c2466f813e045056b62084806445fd54a61fc@%3Cdev.beam.apache.org%3E> To verify the result, I have used GroupByKey, which should give the same result as GroupIntoBatches *for my case*. However, When PCollection is created using "Create.of(listOfRow)", the results are always correct. Regards, Rahul On Fri, Aug 9, 2019 at 1:05 PM Robert Bradshaw wrote: > Could you clarify what you mean by "inconsistent" and "incorrect"? Are > elements missing/duplicated, or just batched differently? > > On Fri, Aug 9, 2019 at 2:18 AM rahul patwari > wrote: > > > > I only ran in Direct runner. I will run in other runners and let you > know the results. > > I am not setting "streaming" when executing. > > > > On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik, wrote: > >> > >> Have you tried running this on more than one runner (e.g. Dataflow, > Flink, Direct)? > >> > >> Are you setting --streaming when executing? > >> > >> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari < > rahulpatwari8...@gmail.com> wrote: > >>> > >>> Hi, > >>> > >>> I am getting inconsistent results when using GroupIntoBatches > PTransform. > >>> I am using Create.of() PTransform to create a PCollection from > in-memory. When a coder is given with Create.of() PTransform, I am facing > the issue. > >>> If the coder is not provided, the results are consistent and > correct(Maybe this is just a coincidence and the problem is at some other > place). > >>> If Batch Size is 1, results are always consistent. > >>> > >>> Not sure if this is an issue with Serialization/Deserialization (or) > GroupIntoBatches (or) Create.of() PTransform. > >>> > >>> The Java code, expected correct results, and inconsistent results are > available at https://github.com/rahul8383/beam-examples > >>> > >>> Thanks, > >>> Rahul >
Re: Inconsistent Results with GroupIntoBatches PTransform
Could you clarify what you mean by "inconsistent" and "incorrect"? Are elements missing/duplicated, or just batched differently? On Fri, Aug 9, 2019 at 2:18 AM rahul patwari wrote: > > I only ran in Direct runner. I will run in other runners and let you know the > results. > I am not setting "streaming" when executing. > > On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik, wrote: >> >> Have you tried running this on more than one runner (e.g. Dataflow, Flink, >> Direct)? >> >> Are you setting --streaming when executing? >> >> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari >> wrote: >>> >>> Hi, >>> >>> I am getting inconsistent results when using GroupIntoBatches PTransform. >>> I am using Create.of() PTransform to create a PCollection from in-memory. >>> When a coder is given with Create.of() PTransform, I am facing the issue. >>> If the coder is not provided, the results are consistent and correct(Maybe >>> this is just a coincidence and the problem is at some other place). >>> If Batch Size is 1, results are always consistent. >>> >>> Not sure if this is an issue with Serialization/Deserialization (or) >>> GroupIntoBatches (or) Create.of() PTransform. >>> >>> The Java code, expected correct results, and inconsistent results are >>> available at https://github.com/rahul8383/beam-examples >>> >>> Thanks, >>> Rahul
Re: Inconsistent Results with GroupIntoBatches PTransform
I only ran in Direct runner. I will run in other runners and let you know the results. I am not setting "streaming" when executing. On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik, wrote: > Have you tried running this on more than one runner (e.g. Dataflow, Flink, > Direct)? > > Are you setting --streaming when executing? > > On Thu, Aug 8, 2019 at 10:23 AM rahul patwari > wrote: > >> Hi, >> >> I am getting inconsistent results when using GroupIntoBatches PTransform. >> I am using Create.of() PTransform to create a PCollection from in-memory. >> When a coder is given with Create.of() PTransform, I am facing the issue. >> If the coder is not provided, the results are consistent and >> correct(Maybe this is just a coincidence and the problem is at some other >> place). >> If Batch Size is 1, results are always consistent. >> >> Not sure if this is an issue with Serialization/Deserialization (or) >> GroupIntoBatches (or) Create.of() PTransform. >> >> The Java code, expected correct results, and inconsistent results are >> available at https://github.com/rahul8383/beam-examples >> >> Thanks, >> Rahul >> >
Re: Inconsistent Results with GroupIntoBatches PTransform
Have you tried running this on more than one runner (e.g. Dataflow, Flink, Direct)? Are you setting --streaming when executing? On Thu, Aug 8, 2019 at 10:23 AM rahul patwari wrote: > Hi, > > I am getting inconsistent results when using GroupIntoBatches PTransform. > I am using Create.of() PTransform to create a PCollection from in-memory. > When a coder is given with Create.of() PTransform, I am facing the issue. > If the coder is not provided, the results are consistent and correct(Maybe > this is just a coincidence and the problem is at some other place). > If Batch Size is 1, results are always consistent. > > Not sure if this is an issue with Serialization/Deserialization (or) > GroupIntoBatches (or) Create.of() PTransform. > > The Java code, expected correct results, and inconsistent results are > available at https://github.com/rahul8383/beam-examples > > Thanks, > Rahul >
Inconsistent Results with GroupIntoBatches PTransform
Hi, I am getting inconsistent results when using GroupIntoBatches PTransform. I am using Create.of() PTransform to create a PCollection from in-memory. When a coder is given with Create.of() PTransform, I am facing the issue. If the coder is not provided, the results are consistent and correct(Maybe this is just a coincidence and the problem is at some other place). If Batch Size is 1, results are always consistent. Not sure if this is an issue with Serialization/Deserialization (or) GroupIntoBatches (or) Create.of() PTransform. The Java code, expected correct results, and inconsistent results are available at https://github.com/rahul8383/beam-examples Thanks, Rahul