Re: Issues running Kafka streaming pipeline in Python

2021-06-02 Thread Alex Koay
Finally figured out the issue.
Can confirm that the kafka_taxi job is working as expected now.
The issue was that I ran the Dataflow job with an invalid experiments flag
(runner_v2 instead of use_runner_v2), and I was getting logging messages
(on 2.29) that said that I was using Runner V2 even though it seems that I
wasn't.
Setting the correct flag fixes the issue (and so I get to see the correctly
expanded transforms in the graph).
Thanks for your help Cham!

Cheers
Alex

On Thu, Jun 3, 2021 at 1:07 AM Chamikara Jayalath 
wrote:

> Can you mention the Job Logs you see in the Dataflow Cloud Console page
> for your job ? Can you also mention the pipeline and configs you used for
> Dataflow (assuming it's different from what's given in the example) ?
> Make sure that you used Dataflow Runner v2 (as given in the example).
> Are you providing null keys by any chance ? There's a known issue related
> to that (but if you are just running the example, it should generate
> appropriate keys).
>
> Unfortunately for actually debugging your job, I need a Dataflow customer 
> support
> ticket .
>
> Thanks,
> Cham
>
> On Wed, Jun 2, 2021 at 9:45 AM Alex Koay  wrote:
>
>> CC-ing Chamikara as he got omitted from the reply all I did earlier.
>>
>> On Thu, Jun 3, 2021 at 12:43 AM Alex Koay  wrote:
>>
>>> Yeah, I figured it wasn't supported correctly on DirectRunner. Stumbled
>>> upon several threads saying so.
>>>
>>> On Dataflow, I've encountered a few different kinds of issues.
>>> 1. For the kafka_taxi example, the pipeline would start, the PubSub to
>>> Kafka would run, but nothing gets read from Kafka (this seems to get
>>> expanded as Dataflow shows KafkaIO.Read + Remove Kafka Metadata
>>> sub-transforms.
>>> 2. For the snippet I shared above, I would vary it either with a "log"
>>> transform or a direct "write" back to Kafka. Neither seems to work (and the
>>> steps don't get expanded unlike the kafka_taxi example). With the "write"
>>> step, I believe it didn't get captured in the Dataflow graph a few times.
>>> 3. No errors appear in both Job Logs and Worker Logs, except for one
>>> message emitted from the "log" step if that happens.
>>>
>>> All this is happening while I am producing ~4 messages/sec in Kafka. I
>>> can verify that Kafka is working normally remotely and all (ran into some
>>> issues setting it up).
>>> I've also tested the KafkaIO.read transform in Java and can confirm that
>>> it works as expected.
>>>
>>> As an aside, I put together an ExternalTransform for MqttIO which you
>>> can find here:
>>> https://gist.github.com/alexkoay/df35eb12bc2afd8f502ef13bc915b33c
>>> I can confirm that it works in batch mode, but given that I couldn't get
>>> Kafka to work with Dataflow, I don't have much confidence in getting this
>>> to work.
>>>
>>> Thanks for your help.
>>>
>>> On Thu, Jun 3, 2021 at 12:05 AM Chamikara Jayalath 
>>> wrote:
>>>
 What error did you run into with Dataflow ? Did you observe any errors
 in worker logs ?
 If you follow the steps given in the example here
 
 it should work. Make sure Dataflow workers have access to Kafka bootstrap
 servers you provide.

 Portable DirectRunner currently doesn't support streaming mode so you
 need to convert your pipeline to a batch pipeline and provide
 'max_num_records' or 'max_read_time' to convert the Kafka source to a batch
 source.
 This is tracked in https://issues.apache.org/jira/browse/BEAM-7514.

 Also portable runners (Flink, Spark etc.) have a known issue related to
 SDF checkpointing in streaming mode which results in messages not being
 pushed to subsequent steps. This is tracked in
 https://issues.apache.org/jira/browse/BEAM-11998.

 Thanks,
 Cham

 On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay  wrote:

> /cc @Boyuan Zhang  for kafka @Chamikara Jayalath
>  for multi language might be able to help.
>
> On Tue, Jun 1, 2021 at 9:39 PM Alex Koay  wrote:
>
>> Hi all,
>>
>> I have created a simple snippet as such:
>>
>> import apache_beam as beam
>> from apache_beam.io.kafka import ReadFromKafka
>> from apache_beam.options.pipeline_options import PipelineOptions
>>
>> import logging
>> logging.basicConfig(level=logging.WARNING)
>>
>> opts = direct_opts
>> with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner",
>> "--streaming"])) as p:
>> (
>> p
>> | "read" >> ReadFromKafka({"bootstrap.servers":
>> f"localhost:9092"}, topics=["topic"])
>> | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x))
>> )
>>
>> I've set up a Kafka single node similar to the kafka_taxi README, and
>> run this both on DirectRunner and DataflowRunner but it doesn't work. 
>> What
>> I mean by 

Re: Is there a way (seetings) to limit the number of element per worker machine

2021-06-02 Thread OrielResearch Eila Arich-Landkof
Hi Roberts,
Thank you. I usually work with the custom worker configuration options
I will custom it to low number of cores with large memory and see if it solves 
my problem

Thanks so much,
—
Eila
www.orielresearch.com
https://www.meetup.com/Deep-Learning-In-Production 


Sent from my iPhone

> On Jun 2, 2021, at 2:10 PM, Robert Bradshaw  wrote:
> 
> If you want to control the total number of elements being processed
> across all workers at a time, you can do this by assigning random keys
> of the form RandomInteger() % TotalDesiredConcurrency followed by a
> GroupByKey.
> 
> If you want to control the number of elements being processed in
> parallel per VM, you can use the fact that Dataflow assigns one work
> item per core, so an n1-standard-4 would process 4 elements in
> parallel, an n1-highmem-2 would process 2 elements in parallel, etc.
> 
> You could also control this explicitly by using a global (per worker)
> semaphore in your code. If you do this you may want to proceed your
> rate-limited DoFn with a Reshuffle to ensure fair (and dynamic) work
> distribution. This should be much easier than trying to coordinate
> multiple parallel pipelines.
> 
>> On Fri, May 28, 2021 at 5:16 AM Eila Oriel Research
>>  wrote:
>> 
>> Thanks Robert.
>> I found the following explanation for the number of threads for 4 cores:
>> You have 4 CPU sockets, each CPU can have, up to, 12 cores and each core can 
>> have two threads. Your max thread count is, 4 CPU x 12 cores x 2 threads per 
>> core, so 12 x 4 x 2 is 96
>> Can I limit the threads using the pipeline options in some way? 10-20 
>> elements per worker will work for me.
>> 
>> My current practice to work around that issue is to limit the number of 
>> elements in each dataflow pipeline (providing ~10 elements for each pipeline)
>> Once I have completed around 200 elements processing = 20 pipelines (google 
>> does not allow more than 25 dataflow pipelines per region) with 10 elements 
>> each, I am launching the next 20 pipelines.
>> 
>> This is ofcourse missing the benefit of serverless.
>> 
>> Any idea, how to work around this?
>> 
>> Best,
>> Eila
>> 
>> 
>>> On Mon, May 17, 2021 at 1:27 PM Robert Bradshaw  wrote:
>>> 
>>> Note that workers generally process one element per thread at a time. The 
>>> number of threads defaults to the number of cores of the VM that you're 
>>> using.
>>> 
>>> On Mon, May 17, 2021 at 10:18 AM Brian Hulette  wrote:
 
 What type of files are you reading? If they can be split and read by 
 multiple workers this might be a good candidate for a Splittable DoFn 
 (SDF).
 
 Brian
 
 On Wed, May 12, 2021 at 6:18 AM Eila Oriel Research 
  wrote:
> 
> Hi,
> I am running out of resources on the workers machines.
> The reasons are:
> 1. Every pcollection is a reference to a LARGE file that is copied into 
> the worker
> 2. The worker makes calculations on the copied file using a software 
> library that consumes memory / storage / compute resources
> 
> I have changed the workers' CPUs and memory size. At some point, I am 
> running out of resources with this method as well
> I am looking to limit the number of pCollection / elements that are being 
> processed in parallel on each worker at a time.
> 
> Many thank for any advice,
> Best wishes,
> --
> Eila
> 
> Meetup
>> 
>> 
>> 
>> --
>> Eila
>> 
>> Meetup


Re: Is there a way (seetings) to limit the number of element per worker machine

2021-06-02 Thread Vincent Marquez
On Wed, Jun 2, 2021 at 11:27 AM Robert Bradshaw  wrote:

> On Wed, Jun 2, 2021 at 11:18 AM Vincent Marquez
>  wrote:
> >
> > On Wed, Jun 2, 2021 at 11:11 AM Robert Bradshaw 
> wrote:
> >>
> >> If you want to control the total number of elements being processed
> >> across all workers at a time, you can do this by assigning random keys
> >> of the form RandomInteger() % TotalDesiredConcurrency followed by a
> >> GroupByKey.
> >>
> >> If you want to control the number of elements being processed in
> >> parallel per VM, you can use the fact that Dataflow assigns one work
> >> item per core, so an n1-standard-4 would process 4 elements in
> >> parallel, an n1-highmem-2 would process 2 elements in parallel, etc.
> >>
> >> You could also control this explicitly by using a global (per worker)
> >> semaphore in your code. If you do this you may want to proceed your
> >> rate-limited DoFn with a Reshuffle to ensure fair (and dynamic) work
> >> distribution. This should be much easier than trying to coordinate
> >> multiple parallel pipelines.
> >>
> >
> > Is there a risk here of having an OOM error due to 'build up' of in
> memory elements from a streaming input?  Or do the runners have some
> concept of throttling bundles based on progress of stages further down the
> pipeline?
>
> For streaming pipelines, hundreds of threads (aka work items) are
> allocated for each worker, so limiting the number of concurrent items
> per worker is harder there.
>
>
Hmm, I did notice this today, that many many many DoFns are instantiated in
a streaming job compared to how many I expected.   This seems like it would
cause all sorts of problems.  For instance, if one were to use the readAll
for say JDBC or Redis or any number of connectors, each of which sets up
connections to some endpoint, a single worker could have hundreds or
thousands of JDBC connections?  I would think this would definitely make
some of the readAll transforms less usable in a streaming pipeline if
scaling out the number of workers would overload the source machines.

Is this behavior documented somewhere?  Is this true for all runners?

--Vincent




> >> On Fri, May 28, 2021 at 5:16 AM Eila Oriel Research
> >>  wrote:
> >> >
> >> > Thanks Robert.
> >> > I found the following explanation for the number of threads for 4
> cores:
> >> > You have 4 CPU sockets, each CPU can have, up to, 12 cores and each
> core can have two threads. Your max thread count is, 4 CPU x 12 cores x 2
> threads per core, so 12 x 4 x 2 is 96
> >> > Can I limit the threads using the pipeline options in some way? 10-20
> elements per worker will work for me.
> >> >
> >> > My current practice to work around that issue is to limit the number
> of elements in each dataflow pipeline (providing ~10 elements for each
> pipeline)
> >> > Once I have completed around 200 elements processing = 20 pipelines
> (google does not allow more than 25 dataflow pipelines per region) with 10
> elements each, I am launching the next 20 pipelines.
> >> >
> >> > This is ofcourse missing the benefit of serverless.
> >> >
> >> > Any idea, how to work around this?
> >> >
> >> > Best,
> >> > Eila
> >> >
> >> >
> >> > On Mon, May 17, 2021 at 1:27 PM Robert Bradshaw 
> wrote:
> >> >>
> >> >> Note that workers generally process one element per thread at a
> time. The number of threads defaults to the number of cores of the VM that
> you're using.
> >> >>
> >> >> On Mon, May 17, 2021 at 10:18 AM Brian Hulette 
> wrote:
> >> >>>
> >> >>> What type of files are you reading? If they can be split and read
> by multiple workers this might be a good candidate for a Splittable DoFn
> (SDF).
> >> >>>
> >> >>> Brian
> >> >>>
> >> >>> On Wed, May 12, 2021 at 6:18 AM Eila Oriel Research <
> e...@orielresearch.org> wrote:
> >> 
> >>  Hi,
> >>  I am running out of resources on the workers machines.
> >>  The reasons are:
> >>  1. Every pcollection is a reference to a LARGE file that is copied
> into the worker
> >>  2. The worker makes calculations on the copied file using a
> software library that consumes memory / storage / compute resources
> >> 
> >>  I have changed the workers' CPUs and memory size. At some point, I
> am running out of resources with this method as well
> >>  I am looking to limit the number of pCollection / elements that
> are being processed in parallel on each worker at a time.
> >> 
> >>  Many thank for any advice,
> >>  Best wishes,
> >>  --
> >>  Eila
> >> 
> >>  Meetup
> >> >
> >> >
> >> >
> >> > --
> >> > Eila
> >> >
> >> > Meetup
> >
> >
> >
> > ~Vincent
>


Re: RenameFields behaves differently in DirectRunner

2021-06-02 Thread Brian Hulette
> One thing that's been on the back burner for a long time is making
CoderProperties into a CoderTester like Guava's EqualityTester.

Reuven's point still applies here though. This issue is not due to a bug in
SchemaCoder, it's a problem with the Row we gave SchemaCoder to encode. I'm
assuming a CoderTester would require manually generating inputs right?
These input Rows represent an illegal state that we wouldn't test with.
(That being said I like the idea of a CoderTester in general)

Brian

On Wed, Jun 2, 2021 at 12:11 PM Kenneth Knowles  wrote:

> Mutability checking might catch that.
>
> I meant to suggest not putting the check in the pipeline, but offering a
> testing discipline that will catch such issues. One thing that's been on
> the back burner for a long time is making CoderProperties into a
> CoderTester like Guava's EqualityTester. Then it can run through all the
> properties without a user setting up test suites. Downside is that the test
> failure signal gets aggregated.
>
> Kenn
>
> On Wed, Jun 2, 2021 at 12:09 PM Brian Hulette  wrote:
>
>> Could the DirectRunner just do an equality check whenever it does an
>> encode/decode? It sounds like it's already effectively performing
>> a CoderProperties.coderDecodeEncodeEqual for every element, just omitting
>> the equality check.
>>
>> On Wed, Jun 2, 2021 at 12:04 PM Reuven Lax  wrote:
>>
>>> There is no bug in the Coder itself, so that wouldn't catch it. We could
>>> insert CoderProperties.coderDecodeEncodeEqual in a subsequent ParDo, but if
>>> the Direct runner already does an encode/decode before that ParDo, then
>>> that would have fixed the problem before we could see it.
>>>
>>> On Wed, Jun 2, 2021 at 11:53 AM Kenneth Knowles  wrote:
>>>
 Would it be caught by CoderProperties?

 Kenn

 On Wed, Jun 2, 2021 at 8:16 AM Reuven Lax  wrote:

> I don't think this bug is schema specific - we created a Java object
> that is inconsistent with its encoded form, which could happen to any
> transform.
>
> This does seem to be a gap in DirectRunner testing though. It also
> makes it hard to test using PAssert, as I believe that puts everything in 
> a
> side input, forcing an encoding/decoding.
>
> On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette 
> wrote:
>
>> +dev 
>>
>> > I bet the DirectRunner is encoding and decoding in between, which
>> fixes the object.
>>
>> Do we need better testing of schema-aware (and potentially other
>> built-in) transforms in the face of fusion to root out issues like this?
>>
>> Brian
>>
>> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang <
>> matthew.ouy...@gmail.com> wrote:
>>
>>> I have some other work-related things I need to do this week, so I
>>> will likely report back on this over the weekend.  Thank you for the
>>> explanation.  It makes perfect sense now.
>>>
>>> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax  wrote:
>>>
 Some more context - the problem is that RenameFields outputs (in
 this case) Java Row objects that are inconsistent with the actual 
 schema.
 For example if you have the following schema:

 Row {
field1: Row {
   field2: string
 }
 }

 And rename field1.field2 -> renamed, you'll get the following schema

 Row {
   field1: Row {
  renamed: string
}
 }

 However the Java object for the _nested_ row will return the old
 schema if getSchema() is called on it. This is because we only update 
 the
 schema on the top-level row.

 I think this explains why your test works in the direct runner. If
 the row ever goes through an encode/decode path, it will come back 
 correct.
 The original incorrect Java objects are no longer around, and new
 (consistent) objects are constructed from the raw data and the 
 PCollection
 schema. Dataflow tends to fuse ParDos together, so the following ParDo 
 will
 see the incorrect Row object. I bet the DirectRunner is encoding and
 decoding in between, which fixes the object.

 You can validate this theory by forcing a shuffle after
 RenameFields using Reshufflle. It should fix the issue If it does, let 
 me
 know and I'll work on a fix to RenameFields.

 On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax  wrote:

> Aha, yes this indeed another bug in the transform. The schema is
> set on the top-level Row but not on any nested rows.
>
> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang <
> matthew.ouy...@gmail.com> wrote:
>
>> Thank you everyone for your input.  I believe it will be easiest
>> to respond to all feedback in a single 

Re: Allyship workshops for open source contributors

2021-06-02 Thread Aizhamal Nurmamat kyzy
>
> If we have a good number of people who express interest in this thread, I
> will set up training for the Airflow community.
>

I meant Beam ^^' I am organizing it for the Airflow community as well.


Allyship workshops for open source contributors

2021-06-02 Thread Aizhamal Nurmamat kyzy
Hi Beamers,

Would this community be interested in taking the Allyship Training? It
requires a 90min commitment for remote session learning. If we have a good
number of people who express interest in this thread, I will set up
training for the Airflow community. If we don't have the critical mass, I
might invite people from other open source projects, but the format and
learning will be the same!

Here are more details:

The training is led by Dr. Kim Tran (https://www.kimtranphd.com/), it aims
to position people of color and those in solidarity with us to develop the
necessary skills to build bridges across race, ability, gender, sexuality
and class.

Participants will leave:
- With the capacity to identify marginalization in real time in the open
source community
- Knowing how to address and respond to marginalization at individual and
systemic levels
- With a strong, critical understanding of the allyship framework
- Intersectional lenses, examining dynamics around gender, class, ability
and race
- A toolkit for recognizing and combating marginalization in real time

Format:
- Large groups will engage in a *90 minute*, remote learning session.
- Participants will be capped at 45 to enable engaging interactive
participation and responsiveness to participant questions and concerns.
- Session will include webinar style portion as well as breakouts for
hypothetical scenarios.

If you'd like to participate, please leave +1 under this thread. The thread
will stay open for 1 week.

Thanks,
Aizhamal


Re: RenameFields behaves differently in DirectRunner

2021-06-02 Thread Kenneth Knowles
Mutability checking might catch that.

I meant to suggest not putting the check in the pipeline, but offering a
testing discipline that will catch such issues. One thing that's been on
the back burner for a long time is making CoderProperties into a
CoderTester like Guava's EqualityTester. Then it can run through all the
properties without a user setting up test suites. Downside is that the test
failure signal gets aggregated.

Kenn

On Wed, Jun 2, 2021 at 12:09 PM Brian Hulette  wrote:

> Could the DirectRunner just do an equality check whenever it does an
> encode/decode? It sounds like it's already effectively performing
> a CoderProperties.coderDecodeEncodeEqual for every element, just omitting
> the equality check.
>
> On Wed, Jun 2, 2021 at 12:04 PM Reuven Lax  wrote:
>
>> There is no bug in the Coder itself, so that wouldn't catch it. We could
>> insert CoderProperties.coderDecodeEncodeEqual in a subsequent ParDo, but if
>> the Direct runner already does an encode/decode before that ParDo, then
>> that would have fixed the problem before we could see it.
>>
>> On Wed, Jun 2, 2021 at 11:53 AM Kenneth Knowles  wrote:
>>
>>> Would it be caught by CoderProperties?
>>>
>>> Kenn
>>>
>>> On Wed, Jun 2, 2021 at 8:16 AM Reuven Lax  wrote:
>>>
 I don't think this bug is schema specific - we created a Java object
 that is inconsistent with its encoded form, which could happen to any
 transform.

 This does seem to be a gap in DirectRunner testing though. It also
 makes it hard to test using PAssert, as I believe that puts everything in a
 side input, forcing an encoding/decoding.

 On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette 
 wrote:

> +dev 
>
> > I bet the DirectRunner is encoding and decoding in between, which
> fixes the object.
>
> Do we need better testing of schema-aware (and potentially other
> built-in) transforms in the face of fusion to root out issues like this?
>
> Brian
>
> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang <
> matthew.ouy...@gmail.com> wrote:
>
>> I have some other work-related things I need to do this week, so I
>> will likely report back on this over the weekend.  Thank you for the
>> explanation.  It makes perfect sense now.
>>
>> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax  wrote:
>>
>>> Some more context - the problem is that RenameFields outputs (in
>>> this case) Java Row objects that are inconsistent with the actual 
>>> schema.
>>> For example if you have the following schema:
>>>
>>> Row {
>>>field1: Row {
>>>   field2: string
>>> }
>>> }
>>>
>>> And rename field1.field2 -> renamed, you'll get the following schema
>>>
>>> Row {
>>>   field1: Row {
>>>  renamed: string
>>>}
>>> }
>>>
>>> However the Java object for the _nested_ row will return the old
>>> schema if getSchema() is called on it. This is because we only update 
>>> the
>>> schema on the top-level row.
>>>
>>> I think this explains why your test works in the direct runner. If
>>> the row ever goes through an encode/decode path, it will come back 
>>> correct.
>>> The original incorrect Java objects are no longer around, and new
>>> (consistent) objects are constructed from the raw data and the 
>>> PCollection
>>> schema. Dataflow tends to fuse ParDos together, so the following ParDo 
>>> will
>>> see the incorrect Row object. I bet the DirectRunner is encoding and
>>> decoding in between, which fixes the object.
>>>
>>> You can validate this theory by forcing a shuffle after RenameFields
>>> using Reshufflle. It should fix the issue If it does, let me know and 
>>> I'll
>>> work on a fix to RenameFields.
>>>
>>> On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax  wrote:
>>>
 Aha, yes this indeed another bug in the transform. The schema is
 set on the top-level Row but not on any nested rows.

 On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang <
 matthew.ouy...@gmail.com> wrote:

> Thank you everyone for your input.  I believe it will be easiest
> to respond to all feedback in a single message rather than messages 
> per
> person.
>
>- NeedsRunner - The tests are run eventually, so obviously all
>good on my end.  I was trying to run the smallest subset of test 
> cases
>possible and didn't venture beyond `gradle test`.
>- Stack Trace - There wasn't any unfortunately because no
>exception thrown in the code.  The Beam Row was translated into a 
> BQ
>TableRow and an insertion was attempted.  The error "message" was 
> part of
>the response JSON that came back as a result of a request against 
> the BQ
>API.

Re: RenameFields behaves differently in DirectRunner

2021-06-02 Thread Brian Hulette
Could the DirectRunner just do an equality check whenever it does an
encode/decode? It sounds like it's already effectively performing
a CoderProperties.coderDecodeEncodeEqual for every element, just omitting
the equality check.

On Wed, Jun 2, 2021 at 12:04 PM Reuven Lax  wrote:

> There is no bug in the Coder itself, so that wouldn't catch it. We could
> insert CoderProperties.coderDecodeEncodeEqual in a subsequent ParDo, but if
> the Direct runner already does an encode/decode before that ParDo, then
> that would have fixed the problem before we could see it.
>
> On Wed, Jun 2, 2021 at 11:53 AM Kenneth Knowles  wrote:
>
>> Would it be caught by CoderProperties?
>>
>> Kenn
>>
>> On Wed, Jun 2, 2021 at 8:16 AM Reuven Lax  wrote:
>>
>>> I don't think this bug is schema specific - we created a Java object
>>> that is inconsistent with its encoded form, which could happen to any
>>> transform.
>>>
>>> This does seem to be a gap in DirectRunner testing though. It also makes
>>> it hard to test using PAssert, as I believe that puts everything in a side
>>> input, forcing an encoding/decoding.
>>>
>>> On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette 
>>> wrote:
>>>
 +dev 

 > I bet the DirectRunner is encoding and decoding in between, which
 fixes the object.

 Do we need better testing of schema-aware (and potentially other
 built-in) transforms in the face of fusion to root out issues like this?

 Brian

 On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang 
 wrote:

> I have some other work-related things I need to do this week, so I
> will likely report back on this over the weekend.  Thank you for the
> explanation.  It makes perfect sense now.
>
> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax  wrote:
>
>> Some more context - the problem is that RenameFields outputs (in this
>> case) Java Row objects that are inconsistent with the actual schema.
>> For example if you have the following schema:
>>
>> Row {
>>field1: Row {
>>   field2: string
>> }
>> }
>>
>> And rename field1.field2 -> renamed, you'll get the following schema
>>
>> Row {
>>   field1: Row {
>>  renamed: string
>>}
>> }
>>
>> However the Java object for the _nested_ row will return the old
>> schema if getSchema() is called on it. This is because we only update the
>> schema on the top-level row.
>>
>> I think this explains why your test works in the direct runner. If
>> the row ever goes through an encode/decode path, it will come back 
>> correct.
>> The original incorrect Java objects are no longer around, and new
>> (consistent) objects are constructed from the raw data and the 
>> PCollection
>> schema. Dataflow tends to fuse ParDos together, so the following ParDo 
>> will
>> see the incorrect Row object. I bet the DirectRunner is encoding and
>> decoding in between, which fixes the object.
>>
>> You can validate this theory by forcing a shuffle after RenameFields
>> using Reshufflle. It should fix the issue If it does, let me know and 
>> I'll
>> work on a fix to RenameFields.
>>
>> On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax  wrote:
>>
>>> Aha, yes this indeed another bug in the transform. The schema is set
>>> on the top-level Row but not on any nested rows.
>>>
>>> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang <
>>> matthew.ouy...@gmail.com> wrote:
>>>
 Thank you everyone for your input.  I believe it will be easiest to
 respond to all feedback in a single message rather than messages per 
 person.

- NeedsRunner - The tests are run eventually, so obviously all
good on my end.  I was trying to run the smallest subset of test 
 cases
possible and didn't venture beyond `gradle test`.
- Stack Trace - There wasn't any unfortunately because no
exception thrown in the code.  The Beam Row was translated into a BQ
TableRow and an insertion was attempted.  The error "message" was 
 part of
the response JSON that came back as a result of a request against 
 the BQ
API.
- Desired Behaviour - (field0_1.field1_0, nestedStringField) ->
field0_1.nestedStringField is what I am looking for.
- Info Logging Findings (In Lieu of a Stack Trace)
   - The Beam Schema was as expected with all renames applied.
   - The example I provided was heavily stripped down in order
   to isolate the problem.  My work example which a bit impractical 
 because
   it's part of some generic tooling has 4 levels of nesting and 
 also produces
   the correct output too.
   - BigQueryUtils.toTableRow(Row) returns the expected
   

Re: RenameFields behaves differently in DirectRunner

2021-06-02 Thread Reuven Lax
There is no bug in the Coder itself, so that wouldn't catch it. We could
insert CoderProperties.coderDecodeEncodeEqual in a subsequent ParDo, but if
the Direct runner already does an encode/decode before that ParDo, then
that would have fixed the problem before we could see it.

On Wed, Jun 2, 2021 at 11:53 AM Kenneth Knowles  wrote:

> Would it be caught by CoderProperties?
>
> Kenn
>
> On Wed, Jun 2, 2021 at 8:16 AM Reuven Lax  wrote:
>
>> I don't think this bug is schema specific - we created a Java object that
>> is inconsistent with its encoded form, which could happen to any transform.
>>
>> This does seem to be a gap in DirectRunner testing though. It also makes
>> it hard to test using PAssert, as I believe that puts everything in a side
>> input, forcing an encoding/decoding.
>>
>> On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette  wrote:
>>
>>> +dev 
>>>
>>> > I bet the DirectRunner is encoding and decoding in between, which
>>> fixes the object.
>>>
>>> Do we need better testing of schema-aware (and potentially other
>>> built-in) transforms in the face of fusion to root out issues like this?
>>>
>>> Brian
>>>
>>> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang 
>>> wrote:
>>>
 I have some other work-related things I need to do this week, so I will
 likely report back on this over the weekend.  Thank you for the
 explanation.  It makes perfect sense now.

 On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax  wrote:

> Some more context - the problem is that RenameFields outputs (in this
> case) Java Row objects that are inconsistent with the actual schema.
> For example if you have the following schema:
>
> Row {
>field1: Row {
>   field2: string
> }
> }
>
> And rename field1.field2 -> renamed, you'll get the following schema
>
> Row {
>   field1: Row {
>  renamed: string
>}
> }
>
> However the Java object for the _nested_ row will return the old
> schema if getSchema() is called on it. This is because we only update the
> schema on the top-level row.
>
> I think this explains why your test works in the direct runner. If the
> row ever goes through an encode/decode path, it will come back correct. 
> The
> original incorrect Java objects are no longer around, and new (consistent)
> objects are constructed from the raw data and the PCollection schema.
> Dataflow tends to fuse ParDos together, so the following ParDo will see 
> the
> incorrect Row object. I bet the DirectRunner is encoding and decoding in
> between, which fixes the object.
>
> You can validate this theory by forcing a shuffle after RenameFields
> using Reshufflle. It should fix the issue If it does, let me know and I'll
> work on a fix to RenameFields.
>
> On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax  wrote:
>
>> Aha, yes this indeed another bug in the transform. The schema is set
>> on the top-level Row but not on any nested rows.
>>
>> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang <
>> matthew.ouy...@gmail.com> wrote:
>>
>>> Thank you everyone for your input.  I believe it will be easiest to
>>> respond to all feedback in a single message rather than messages per 
>>> person.
>>>
>>>- NeedsRunner - The tests are run eventually, so obviously all
>>>good on my end.  I was trying to run the smallest subset of test 
>>> cases
>>>possible and didn't venture beyond `gradle test`.
>>>- Stack Trace - There wasn't any unfortunately because no
>>>exception thrown in the code.  The Beam Row was translated into a BQ
>>>TableRow and an insertion was attempted.  The error "message" was 
>>> part of
>>>the response JSON that came back as a result of a request against 
>>> the BQ
>>>API.
>>>- Desired Behaviour - (field0_1.field1_0, nestedStringField) ->
>>>field0_1.nestedStringField is what I am looking for.
>>>- Info Logging Findings (In Lieu of a Stack Trace)
>>>   - The Beam Schema was as expected with all renames applied.
>>>   - The example I provided was heavily stripped down in order
>>>   to isolate the problem.  My work example which a bit impractical 
>>> because
>>>   it's part of some generic tooling has 4 levels of nesting and 
>>> also produces
>>>   the correct output too.
>>>   - BigQueryUtils.toTableRow(Row) returns the expected TableRow
>>>   in DirectRunner.  In DataflowRunner however, only the top-level 
>>> renames
>>>   were reflected in the TableRow and all renames in the nested 
>>> fields weren't.
>>>   - BigQueryUtils.toTableRow(Row) recurses on the Row values
>>>   and uses the Row.schema to get the field names.  This makes sense 
>>> to me,
>>>   but if a value is actually a Row then its schema 

Re: RenameFields behaves differently in DirectRunner

2021-06-02 Thread Kenneth Knowles
Would it be caught by CoderProperties?

Kenn

On Wed, Jun 2, 2021 at 8:16 AM Reuven Lax  wrote:

> I don't think this bug is schema specific - we created a Java object that
> is inconsistent with its encoded form, which could happen to any transform.
>
> This does seem to be a gap in DirectRunner testing though. It also makes
> it hard to test using PAssert, as I believe that puts everything in a side
> input, forcing an encoding/decoding.
>
> On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette  wrote:
>
>> +dev 
>>
>> > I bet the DirectRunner is encoding and decoding in between, which fixes
>> the object.
>>
>> Do we need better testing of schema-aware (and potentially other
>> built-in) transforms in the face of fusion to root out issues like this?
>>
>> Brian
>>
>> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang 
>> wrote:
>>
>>> I have some other work-related things I need to do this week, so I will
>>> likely report back on this over the weekend.  Thank you for the
>>> explanation.  It makes perfect sense now.
>>>
>>> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax  wrote:
>>>
 Some more context - the problem is that RenameFields outputs (in this
 case) Java Row objects that are inconsistent with the actual schema.
 For example if you have the following schema:

 Row {
field1: Row {
   field2: string
 }
 }

 And rename field1.field2 -> renamed, you'll get the following schema

 Row {
   field1: Row {
  renamed: string
}
 }

 However the Java object for the _nested_ row will return the old schema
 if getSchema() is called on it. This is because we only update the schema
 on the top-level row.

 I think this explains why your test works in the direct runner. If the
 row ever goes through an encode/decode path, it will come back correct. The
 original incorrect Java objects are no longer around, and new (consistent)
 objects are constructed from the raw data and the PCollection schema.
 Dataflow tends to fuse ParDos together, so the following ParDo will see the
 incorrect Row object. I bet the DirectRunner is encoding and decoding in
 between, which fixes the object.

 You can validate this theory by forcing a shuffle after RenameFields
 using Reshufflle. It should fix the issue If it does, let me know and I'll
 work on a fix to RenameFields.

 On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax  wrote:

> Aha, yes this indeed another bug in the transform. The schema is set
> on the top-level Row but not on any nested rows.
>
> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang <
> matthew.ouy...@gmail.com> wrote:
>
>> Thank you everyone for your input.  I believe it will be easiest to
>> respond to all feedback in a single message rather than messages per 
>> person.
>>
>>- NeedsRunner - The tests are run eventually, so obviously all
>>good on my end.  I was trying to run the smallest subset of test cases
>>possible and didn't venture beyond `gradle test`.
>>- Stack Trace - There wasn't any unfortunately because no
>>exception thrown in the code.  The Beam Row was translated into a BQ
>>TableRow and an insertion was attempted.  The error "message" was 
>> part of
>>the response JSON that came back as a result of a request against the 
>> BQ
>>API.
>>- Desired Behaviour - (field0_1.field1_0, nestedStringField) ->
>>field0_1.nestedStringField is what I am looking for.
>>- Info Logging Findings (In Lieu of a Stack Trace)
>>   - The Beam Schema was as expected with all renames applied.
>>   - The example I provided was heavily stripped down in order to
>>   isolate the problem.  My work example which a bit impractical 
>> because it's
>>   part of some generic tooling has 4 levels of nesting and also 
>> produces the
>>   correct output too.
>>   - BigQueryUtils.toTableRow(Row) returns the expected TableRow
>>   in DirectRunner.  In DataflowRunner however, only the top-level 
>> renames
>>   were reflected in the TableRow and all renames in the nested 
>> fields weren't.
>>   - BigQueryUtils.toTableRow(Row) recurses on the Row values and
>>   uses the Row.schema to get the field names.  This makes sense to 
>> me, but if
>>   a value is actually a Row then its schema appears to be 
>> inconsistent with
>>   the top-level schema
>>- My Current Workaround - I forked RenameFields and replaced the
>>attachValues in expand method to be a "deep" rename.  This is 
>> obviously
>>inefficient and I will not be submitting a PR for that.
>>- JIRA ticket - https://issues.apache.org/jira/browse/BEAM-12442
>>
>>
>> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax  wrote:
>>
>>> This 

Re: Is there a way (seetings) to limit the number of element per worker machine

2021-06-02 Thread Robert Bradshaw
On Wed, Jun 2, 2021 at 11:18 AM Vincent Marquez
 wrote:
>
> On Wed, Jun 2, 2021 at 11:11 AM Robert Bradshaw  wrote:
>>
>> If you want to control the total number of elements being processed
>> across all workers at a time, you can do this by assigning random keys
>> of the form RandomInteger() % TotalDesiredConcurrency followed by a
>> GroupByKey.
>>
>> If you want to control the number of elements being processed in
>> parallel per VM, you can use the fact that Dataflow assigns one work
>> item per core, so an n1-standard-4 would process 4 elements in
>> parallel, an n1-highmem-2 would process 2 elements in parallel, etc.
>>
>> You could also control this explicitly by using a global (per worker)
>> semaphore in your code. If you do this you may want to proceed your
>> rate-limited DoFn with a Reshuffle to ensure fair (and dynamic) work
>> distribution. This should be much easier than trying to coordinate
>> multiple parallel pipelines.
>>
>
> Is there a risk here of having an OOM error due to 'build up' of in memory 
> elements from a streaming input?  Or do the runners have some concept of 
> throttling bundles based on progress of stages further down the pipeline?

For streaming pipelines, hundreds of threads (aka work items) are
allocated for each worker, so limiting the number of concurrent items
per worker is harder there.

>> On Fri, May 28, 2021 at 5:16 AM Eila Oriel Research
>>  wrote:
>> >
>> > Thanks Robert.
>> > I found the following explanation for the number of threads for 4 cores:
>> > You have 4 CPU sockets, each CPU can have, up to, 12 cores and each core 
>> > can have two threads. Your max thread count is, 4 CPU x 12 cores x 2 
>> > threads per core, so 12 x 4 x 2 is 96
>> > Can I limit the threads using the pipeline options in some way? 10-20 
>> > elements per worker will work for me.
>> >
>> > My current practice to work around that issue is to limit the number of 
>> > elements in each dataflow pipeline (providing ~10 elements for each 
>> > pipeline)
>> > Once I have completed around 200 elements processing = 20 pipelines 
>> > (google does not allow more than 25 dataflow pipelines per region) with 10 
>> > elements each, I am launching the next 20 pipelines.
>> >
>> > This is ofcourse missing the benefit of serverless.
>> >
>> > Any idea, how to work around this?
>> >
>> > Best,
>> > Eila
>> >
>> >
>> > On Mon, May 17, 2021 at 1:27 PM Robert Bradshaw  
>> > wrote:
>> >>
>> >> Note that workers generally process one element per thread at a time. The 
>> >> number of threads defaults to the number of cores of the VM that you're 
>> >> using.
>> >>
>> >> On Mon, May 17, 2021 at 10:18 AM Brian Hulette  
>> >> wrote:
>> >>>
>> >>> What type of files are you reading? If they can be split and read by 
>> >>> multiple workers this might be a good candidate for a Splittable DoFn 
>> >>> (SDF).
>> >>>
>> >>> Brian
>> >>>
>> >>> On Wed, May 12, 2021 at 6:18 AM Eila Oriel Research 
>> >>>  wrote:
>> 
>>  Hi,
>>  I am running out of resources on the workers machines.
>>  The reasons are:
>>  1. Every pcollection is a reference to a LARGE file that is copied into 
>>  the worker
>>  2. The worker makes calculations on the copied file using a software 
>>  library that consumes memory / storage / compute resources
>> 
>>  I have changed the workers' CPUs and memory size. At some point, I am 
>>  running out of resources with this method as well
>>  I am looking to limit the number of pCollection / elements that are 
>>  being processed in parallel on each worker at a time.
>> 
>>  Many thank for any advice,
>>  Best wishes,
>>  --
>>  Eila
>> 
>>  Meetup
>> >
>> >
>> >
>> > --
>> > Eila
>> >
>> > Meetup
>
>
>
> ~Vincent


Re: Is there a way (seetings) to limit the number of element per worker machine

2021-06-02 Thread Vincent Marquez
On Wed, Jun 2, 2021 at 11:11 AM Robert Bradshaw  wrote:

> If you want to control the total number of elements being processed
> across all workers at a time, you can do this by assigning random keys
> of the form RandomInteger() % TotalDesiredConcurrency followed by a
> GroupByKey.
>
> If you want to control the number of elements being processed in
> parallel per VM, you can use the fact that Dataflow assigns one work
> item per core, so an n1-standard-4 would process 4 elements in
> parallel, an n1-highmem-2 would process 2 elements in parallel, etc.
>
> You could also control this explicitly by using a global (per worker)
> semaphore in your code. If you do this you may want to proceed your
> rate-limited DoFn with a Reshuffle to ensure fair (and dynamic) work
> distribution. This should be much easier than trying to coordinate
> multiple parallel pipelines.
>
>
Is there a risk here of having an OOM error due to 'build up' of in memory
elements from a streaming input?  Or do the runners have some concept of
throttling bundles based on progress of stages further down the pipeline?




> On Fri, May 28, 2021 at 5:16 AM Eila Oriel Research
>  wrote:
> >
> > Thanks Robert.
> > I found the following explanation for the number of threads for 4 cores:
> > You have 4 CPU sockets, each CPU can have, up to, 12 cores and each core
> can have two threads. Your max thread count is, 4 CPU x 12 cores x 2
> threads per core, so 12 x 4 x 2 is 96
> > Can I limit the threads using the pipeline options in some way? 10-20
> elements per worker will work for me.
> >
> > My current practice to work around that issue is to limit the number of
> elements in each dataflow pipeline (providing ~10 elements for each
> pipeline)
> > Once I have completed around 200 elements processing = 20 pipelines
> (google does not allow more than 25 dataflow pipelines per region) with 10
> elements each, I am launching the next 20 pipelines.
> >
> > This is ofcourse missing the benefit of serverless.
> >
> > Any idea, how to work around this?
> >
> > Best,
> > Eila
> >
> >
> > On Mon, May 17, 2021 at 1:27 PM Robert Bradshaw 
> wrote:
> >>
> >> Note that workers generally process one element per thread at a time.
> The number of threads defaults to the number of cores of the VM that you're
> using.
> >>
> >> On Mon, May 17, 2021 at 10:18 AM Brian Hulette 
> wrote:
> >>>
> >>> What type of files are you reading? If they can be split and read by
> multiple workers this might be a good candidate for a Splittable DoFn (SDF).
> >>>
> >>> Brian
> >>>
> >>> On Wed, May 12, 2021 at 6:18 AM Eila Oriel Research <
> e...@orielresearch.org> wrote:
> 
>  Hi,
>  I am running out of resources on the workers machines.
>  The reasons are:
>  1. Every pcollection is a reference to a LARGE file that is copied
> into the worker
>  2. The worker makes calculations on the copied file using a software
> library that consumes memory / storage / compute resources
> 
>  I have changed the workers' CPUs and memory size. At some point, I am
> running out of resources with this method as well
>  I am looking to limit the number of pCollection / elements that are
> being processed in parallel on each worker at a time.
> 
>  Many thank for any advice,
>  Best wishes,
>  --
>  Eila
> 
>  Meetup
> >
> >
> >
> > --
> > Eila
> >
> > Meetup
>


~Vincent


Re: Is there a way (seetings) to limit the number of element per worker machine

2021-06-02 Thread Robert Bradshaw
If you want to control the total number of elements being processed
across all workers at a time, you can do this by assigning random keys
of the form RandomInteger() % TotalDesiredConcurrency followed by a
GroupByKey.

If you want to control the number of elements being processed in
parallel per VM, you can use the fact that Dataflow assigns one work
item per core, so an n1-standard-4 would process 4 elements in
parallel, an n1-highmem-2 would process 2 elements in parallel, etc.

You could also control this explicitly by using a global (per worker)
semaphore in your code. If you do this you may want to proceed your
rate-limited DoFn with a Reshuffle to ensure fair (and dynamic) work
distribution. This should be much easier than trying to coordinate
multiple parallel pipelines.

On Fri, May 28, 2021 at 5:16 AM Eila Oriel Research
 wrote:
>
> Thanks Robert.
> I found the following explanation for the number of threads for 4 cores:
> You have 4 CPU sockets, each CPU can have, up to, 12 cores and each core can 
> have two threads. Your max thread count is, 4 CPU x 12 cores x 2 threads per 
> core, so 12 x 4 x 2 is 96
> Can I limit the threads using the pipeline options in some way? 10-20 
> elements per worker will work for me.
>
> My current practice to work around that issue is to limit the number of 
> elements in each dataflow pipeline (providing ~10 elements for each pipeline)
> Once I have completed around 200 elements processing = 20 pipelines (google 
> does not allow more than 25 dataflow pipelines per region) with 10 elements 
> each, I am launching the next 20 pipelines.
>
> This is ofcourse missing the benefit of serverless.
>
> Any idea, how to work around this?
>
> Best,
> Eila
>
>
> On Mon, May 17, 2021 at 1:27 PM Robert Bradshaw  wrote:
>>
>> Note that workers generally process one element per thread at a time. The 
>> number of threads defaults to the number of cores of the VM that you're 
>> using.
>>
>> On Mon, May 17, 2021 at 10:18 AM Brian Hulette  wrote:
>>>
>>> What type of files are you reading? If they can be split and read by 
>>> multiple workers this might be a good candidate for a Splittable DoFn (SDF).
>>>
>>> Brian
>>>
>>> On Wed, May 12, 2021 at 6:18 AM Eila Oriel Research 
>>>  wrote:

 Hi,
 I am running out of resources on the workers machines.
 The reasons are:
 1. Every pcollection is a reference to a LARGE file that is copied into 
 the worker
 2. The worker makes calculations on the copied file using a software 
 library that consumes memory / storage / compute resources

 I have changed the workers' CPUs and memory size. At some point, I am 
 running out of resources with this method as well
 I am looking to limit the number of pCollection / elements that are being 
 processed in parallel on each worker at a time.

 Many thank for any advice,
 Best wishes,
 --
 Eila

 Meetup
>
>
>
> --
> Eila
>
> Meetup


Re: Issues running Kafka streaming pipeline in Python

2021-06-02 Thread Chamikara Jayalath
Can you mention the Job Logs you see in the Dataflow Cloud Console page for
your job ? Can you also mention the pipeline and configs you used for
Dataflow (assuming it's different from what's given in the example) ?
Make sure that you used Dataflow Runner v2 (as given in the example).
Are you providing null keys by any chance ? There's a known issue related
to that (but if you are just running the example, it should generate
appropriate keys).

Unfortunately for actually debugging your job, I need a Dataflow
customer support
ticket .

Thanks,
Cham

On Wed, Jun 2, 2021 at 9:45 AM Alex Koay  wrote:

> CC-ing Chamikara as he got omitted from the reply all I did earlier.
>
> On Thu, Jun 3, 2021 at 12:43 AM Alex Koay  wrote:
>
>> Yeah, I figured it wasn't supported correctly on DirectRunner. Stumbled
>> upon several threads saying so.
>>
>> On Dataflow, I've encountered a few different kinds of issues.
>> 1. For the kafka_taxi example, the pipeline would start, the PubSub to
>> Kafka would run, but nothing gets read from Kafka (this seems to get
>> expanded as Dataflow shows KafkaIO.Read + Remove Kafka Metadata
>> sub-transforms.
>> 2. For the snippet I shared above, I would vary it either with a "log"
>> transform or a direct "write" back to Kafka. Neither seems to work (and the
>> steps don't get expanded unlike the kafka_taxi example). With the "write"
>> step, I believe it didn't get captured in the Dataflow graph a few times.
>> 3. No errors appear in both Job Logs and Worker Logs, except for one
>> message emitted from the "log" step if that happens.
>>
>> All this is happening while I am producing ~4 messages/sec in Kafka. I
>> can verify that Kafka is working normally remotely and all (ran into some
>> issues setting it up).
>> I've also tested the KafkaIO.read transform in Java and can confirm that
>> it works as expected.
>>
>> As an aside, I put together an ExternalTransform for MqttIO which you can
>> find here:
>> https://gist.github.com/alexkoay/df35eb12bc2afd8f502ef13bc915b33c
>> I can confirm that it works in batch mode, but given that I couldn't get
>> Kafka to work with Dataflow, I don't have much confidence in getting this
>> to work.
>>
>> Thanks for your help.
>>
>> On Thu, Jun 3, 2021 at 12:05 AM Chamikara Jayalath 
>> wrote:
>>
>>> What error did you run into with Dataflow ? Did you observe any errors
>>> in worker logs ?
>>> If you follow the steps given in the example here
>>> 
>>> it should work. Make sure Dataflow workers have access to Kafka bootstrap
>>> servers you provide.
>>>
>>> Portable DirectRunner currently doesn't support streaming mode so you
>>> need to convert your pipeline to a batch pipeline and provide
>>> 'max_num_records' or 'max_read_time' to convert the Kafka source to a batch
>>> source.
>>> This is tracked in https://issues.apache.org/jira/browse/BEAM-7514.
>>>
>>> Also portable runners (Flink, Spark etc.) have a known issue related to
>>> SDF checkpointing in streaming mode which results in messages not being
>>> pushed to subsequent steps. This is tracked in
>>> https://issues.apache.org/jira/browse/BEAM-11998.
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay  wrote:
>>>
 /cc @Boyuan Zhang  for kafka @Chamikara Jayalath
  for multi language might be able to help.

 On Tue, Jun 1, 2021 at 9:39 PM Alex Koay  wrote:

> Hi all,
>
> I have created a simple snippet as such:
>
> import apache_beam as beam
> from apache_beam.io.kafka import ReadFromKafka
> from apache_beam.options.pipeline_options import PipelineOptions
>
> import logging
> logging.basicConfig(level=logging.WARNING)
>
> opts = direct_opts
> with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner",
> "--streaming"])) as p:
> (
> p
> | "read" >> ReadFromKafka({"bootstrap.servers":
> f"localhost:9092"}, topics=["topic"])
> | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x))
> )
>
> I've set up a Kafka single node similar to the kafka_taxi README, and
> run this both on DirectRunner and DataflowRunner but it doesn't work. What
> I mean by this is that the Transform seems to be capturing data, but
> doesn't pass it on to subsequent transforms.
> With DirectRunner, if I send a non-keyed Kafka message to the server
> it actually crashes (saying that it cannot encode null into a byte[]),
> hence why I believe the transform is actually running.
>
> My main objective really is to create a streaming ExternalTransform
> for MqttIO and SolaceIO (
> https://github.com/SolaceProducts/solace-apache-beam).
> I've implemented the builder and registrars and they work in batch
> mode (with maxNumRecords) but otherwise it fails to read.
>
> With MqttIO, the 

Re: Issues running Kafka streaming pipeline in Python

2021-06-02 Thread Alex Koay
CC-ing Chamikara as he got omitted from the reply all I did earlier.

On Thu, Jun 3, 2021 at 12:43 AM Alex Koay  wrote:

> Yeah, I figured it wasn't supported correctly on DirectRunner. Stumbled
> upon several threads saying so.
>
> On Dataflow, I've encountered a few different kinds of issues.
> 1. For the kafka_taxi example, the pipeline would start, the PubSub to
> Kafka would run, but nothing gets read from Kafka (this seems to get
> expanded as Dataflow shows KafkaIO.Read + Remove Kafka Metadata
> sub-transforms.
> 2. For the snippet I shared above, I would vary it either with a "log"
> transform or a direct "write" back to Kafka. Neither seems to work (and the
> steps don't get expanded unlike the kafka_taxi example). With the "write"
> step, I believe it didn't get captured in the Dataflow graph a few times.
> 3. No errors appear in both Job Logs and Worker Logs, except for one
> message emitted from the "log" step if that happens.
>
> All this is happening while I am producing ~4 messages/sec in Kafka. I can
> verify that Kafka is working normally remotely and all (ran into some
> issues setting it up).
> I've also tested the KafkaIO.read transform in Java and can confirm that
> it works as expected.
>
> As an aside, I put together an ExternalTransform for MqttIO which you can
> find here:
> https://gist.github.com/alexkoay/df35eb12bc2afd8f502ef13bc915b33c
> I can confirm that it works in batch mode, but given that I couldn't get
> Kafka to work with Dataflow, I don't have much confidence in getting this
> to work.
>
> Thanks for your help.
>
> On Thu, Jun 3, 2021 at 12:05 AM Chamikara Jayalath 
> wrote:
>
>> What error did you run into with Dataflow ? Did you observe any errors in
>> worker logs ?
>> If you follow the steps given in the example here
>> 
>> it should work. Make sure Dataflow workers have access to Kafka bootstrap
>> servers you provide.
>>
>> Portable DirectRunner currently doesn't support streaming mode so you
>> need to convert your pipeline to a batch pipeline and provide
>> 'max_num_records' or 'max_read_time' to convert the Kafka source to a batch
>> source.
>> This is tracked in https://issues.apache.org/jira/browse/BEAM-7514.
>>
>> Also portable runners (Flink, Spark etc.) have a known issue related to
>> SDF checkpointing in streaming mode which results in messages not being
>> pushed to subsequent steps. This is tracked in
>> https://issues.apache.org/jira/browse/BEAM-11998.
>>
>> Thanks,
>> Cham
>>
>> On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay  wrote:
>>
>>> /cc @Boyuan Zhang  for kafka @Chamikara Jayalath
>>>  for multi language might be able to help.
>>>
>>> On Tue, Jun 1, 2021 at 9:39 PM Alex Koay  wrote:
>>>
 Hi all,

 I have created a simple snippet as such:

 import apache_beam as beam
 from apache_beam.io.kafka import ReadFromKafka
 from apache_beam.options.pipeline_options import PipelineOptions

 import logging
 logging.basicConfig(level=logging.WARNING)

 opts = direct_opts
 with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner",
 "--streaming"])) as p:
 (
 p
 | "read" >> ReadFromKafka({"bootstrap.servers":
 f"localhost:9092"}, topics=["topic"])
 | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x))
 )

 I've set up a Kafka single node similar to the kafka_taxi README, and
 run this both on DirectRunner and DataflowRunner but it doesn't work. What
 I mean by this is that the Transform seems to be capturing data, but
 doesn't pass it on to subsequent transforms.
 With DirectRunner, if I send a non-keyed Kafka message to the server it
 actually crashes (saying that it cannot encode null into a byte[]), hence
 why I believe the transform is actually running.

 My main objective really is to create a streaming ExternalTransform for
 MqttIO and SolaceIO (
 https://github.com/SolaceProducts/solace-apache-beam).
 I've implemented the builder and registrars and they work in batch mode
 (with maxNumRecords) but otherwise it fails to read.

 With MqttIO, the streaming transform gets stuck waiting for one bundle
 to complete (if I continuously send messages into the MQTT server), and
 after stopping, the bundles finish but nothing gets passed on either.

 I appreciate any help I can get with this.
 Thanks!

 Cheers
 Alex





Re: Issues running Kafka streaming pipeline in Python

2021-06-02 Thread Alex Koay
Yeah, I figured it wasn't supported correctly on DirectRunner. Stumbled
upon several threads saying so.

On Dataflow, I've encountered a few different kinds of issues.
1. For the kafka_taxi example, the pipeline would start, the PubSub to
Kafka would run, but nothing gets read from Kafka (this seems to get
expanded as Dataflow shows KafkaIO.Read + Remove Kafka Metadata
sub-transforms.
2. For the snippet I shared above, I would vary it either with a "log"
transform or a direct "write" back to Kafka. Neither seems to work (and the
steps don't get expanded unlike the kafka_taxi example). With the "write"
step, I believe it didn't get captured in the Dataflow graph a few times.
3. No errors appear in both Job Logs and Worker Logs, except for one
message emitted from the "log" step if that happens.

All this is happening while I am producing ~4 messages/sec in Kafka. I can
verify that Kafka is working normally remotely and all (ran into some
issues setting it up).
I've also tested the KafkaIO.read transform in Java and can confirm that it
works as expected.

As an aside, I put together an ExternalTransform for MqttIO which you can
find here: https://gist.github.com/alexkoay/df35eb12bc2afd8f502ef13bc915b33c
I can confirm that it works in batch mode, but given that I couldn't get
Kafka to work with Dataflow, I don't have much confidence in getting this
to work.

Thanks for your help.

On Thu, Jun 3, 2021 at 12:05 AM Chamikara Jayalath 
wrote:

> What error did you run into with Dataflow ? Did you observe any errors in
> worker logs ?
> If you follow the steps given in the example here
> 
> it should work. Make sure Dataflow workers have access to Kafka bootstrap
> servers you provide.
>
> Portable DirectRunner currently doesn't support streaming mode so you need
> to convert your pipeline to a batch pipeline and provide 'max_num_records'
> or 'max_read_time' to convert the Kafka source to a batch source.
> This is tracked in https://issues.apache.org/jira/browse/BEAM-7514.
>
> Also portable runners (Flink, Spark etc.) have a known issue related to
> SDF checkpointing in streaming mode which results in messages not being
> pushed to subsequent steps. This is tracked in
> https://issues.apache.org/jira/browse/BEAM-11998.
>
> Thanks,
> Cham
>
> On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay  wrote:
>
>> /cc @Boyuan Zhang  for kafka @Chamikara Jayalath
>>  for multi language might be able to help.
>>
>> On Tue, Jun 1, 2021 at 9:39 PM Alex Koay  wrote:
>>
>>> Hi all,
>>>
>>> I have created a simple snippet as such:
>>>
>>> import apache_beam as beam
>>> from apache_beam.io.kafka import ReadFromKafka
>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>
>>> import logging
>>> logging.basicConfig(level=logging.WARNING)
>>>
>>> opts = direct_opts
>>> with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner",
>>> "--streaming"])) as p:
>>> (
>>> p
>>> | "read" >> ReadFromKafka({"bootstrap.servers":
>>> f"localhost:9092"}, topics=["topic"])
>>> | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x))
>>> )
>>>
>>> I've set up a Kafka single node similar to the kafka_taxi README, and
>>> run this both on DirectRunner and DataflowRunner but it doesn't work. What
>>> I mean by this is that the Transform seems to be capturing data, but
>>> doesn't pass it on to subsequent transforms.
>>> With DirectRunner, if I send a non-keyed Kafka message to the server it
>>> actually crashes (saying that it cannot encode null into a byte[]), hence
>>> why I believe the transform is actually running.
>>>
>>> My main objective really is to create a streaming ExternalTransform for
>>> MqttIO and SolaceIO (
>>> https://github.com/SolaceProducts/solace-apache-beam).
>>> I've implemented the builder and registrars and they work in batch mode
>>> (with maxNumRecords) but otherwise it fails to read.
>>>
>>> With MqttIO, the streaming transform gets stuck waiting for one bundle
>>> to complete (if I continuously send messages into the MQTT server), and
>>> after stopping, the bundles finish but nothing gets passed on either.
>>>
>>> I appreciate any help I can get with this.
>>> Thanks!
>>>
>>> Cheers
>>> Alex
>>>
>>>
>>>


Re: Issues running Kafka streaming pipeline in Python

2021-06-02 Thread Chamikara Jayalath
What error did you run into with Dataflow ? Did you observe any errors in
worker logs ?
If you follow the steps given in the example here

it should work. Make sure Dataflow workers have access to Kafka bootstrap
servers you provide.

Portable DirectRunner currently doesn't support streaming mode so you need
to convert your pipeline to a batch pipeline and provide 'max_num_records'
or 'max_read_time' to convert the Kafka source to a batch source.
This is tracked in https://issues.apache.org/jira/browse/BEAM-7514.

Also portable runners (Flink, Spark etc.) have a known issue related to SDF
checkpointing in streaming mode which results in messages not being pushed
to subsequent steps. This is tracked in
https://issues.apache.org/jira/browse/BEAM-11998.

Thanks,
Cham

On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay  wrote:

> /cc @Boyuan Zhang  for kafka @Chamikara Jayalath
>  for multi language might be able to help.
>
> On Tue, Jun 1, 2021 at 9:39 PM Alex Koay  wrote:
>
>> Hi all,
>>
>> I have created a simple snippet as such:
>>
>> import apache_beam as beam
>> from apache_beam.io.kafka import ReadFromKafka
>> from apache_beam.options.pipeline_options import PipelineOptions
>>
>> import logging
>> logging.basicConfig(level=logging.WARNING)
>>
>> opts = direct_opts
>> with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner",
>> "--streaming"])) as p:
>> (
>> p
>> | "read" >> ReadFromKafka({"bootstrap.servers":
>> f"localhost:9092"}, topics=["topic"])
>> | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x))
>> )
>>
>> I've set up a Kafka single node similar to the kafka_taxi README, and run
>> this both on DirectRunner and DataflowRunner but it doesn't work. What I
>> mean by this is that the Transform seems to be capturing data, but doesn't
>> pass it on to subsequent transforms.
>> With DirectRunner, if I send a non-keyed Kafka message to the server it
>> actually crashes (saying that it cannot encode null into a byte[]), hence
>> why I believe the transform is actually running.
>>
>> My main objective really is to create a streaming ExternalTransform for
>> MqttIO and SolaceIO (https://github.com/SolaceProducts/solace-apache-beam
>> ).
>> I've implemented the builder and registrars and they work in batch mode
>> (with maxNumRecords) but otherwise it fails to read.
>>
>> With MqttIO, the streaming transform gets stuck waiting for one bundle to
>> complete (if I continuously send messages into the MQTT server), and after
>> stopping, the bundles finish but nothing gets passed on either.
>>
>> I appreciate any help I can get with this.
>> Thanks!
>>
>> Cheers
>> Alex
>>
>>
>>


Re: Issues running Kafka streaming pipeline in Python

2021-06-02 Thread Ahmet Altay
/cc @Boyuan Zhang  for kafka @Chamikara Jayalath
 for multi language might be able to help.

On Tue, Jun 1, 2021 at 9:39 PM Alex Koay  wrote:

> Hi all,
>
> I have created a simple snippet as such:
>
> import apache_beam as beam
> from apache_beam.io.kafka import ReadFromKafka
> from apache_beam.options.pipeline_options import PipelineOptions
>
> import logging
> logging.basicConfig(level=logging.WARNING)
>
> opts = direct_opts
> with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner",
> "--streaming"])) as p:
> (
> p
> | "read" >> ReadFromKafka({"bootstrap.servers":
> f"localhost:9092"}, topics=["topic"])
> | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x))
> )
>
> I've set up a Kafka single node similar to the kafka_taxi README, and run
> this both on DirectRunner and DataflowRunner but it doesn't work. What I
> mean by this is that the Transform seems to be capturing data, but doesn't
> pass it on to subsequent transforms.
> With DirectRunner, if I send a non-keyed Kafka message to the server it
> actually crashes (saying that it cannot encode null into a byte[]), hence
> why I believe the transform is actually running.
>
> My main objective really is to create a streaming ExternalTransform for
> MqttIO and SolaceIO (https://github.com/SolaceProducts/solace-apache-beam
> ).
> I've implemented the builder and registrars and they work in batch mode
> (with maxNumRecords) but otherwise it fails to read.
>
> With MqttIO, the streaming transform gets stuck waiting for one bundle to
> complete (if I continuously send messages into the MQTT server), and after
> stopping, the bundles finish but nothing gets passed on either.
>
> I appreciate any help I can get with this.
> Thanks!
>
> Cheers
> Alex
>
>
>


Re: RenameFields behaves differently in DirectRunner

2021-06-02 Thread Reuven Lax
I don't think this bug is schema specific - we created a Java object that
is inconsistent with its encoded form, which could happen to any transform.

This does seem to be a gap in DirectRunner testing though. It also makes it
hard to test using PAssert, as I believe that puts everything in a side
input, forcing an encoding/decoding.

On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette  wrote:

> +dev 
>
> > I bet the DirectRunner is encoding and decoding in between, which fixes
> the object.
>
> Do we need better testing of schema-aware (and potentially other built-in)
> transforms in the face of fusion to root out issues like this?
>
> Brian
>
> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang 
> wrote:
>
>> I have some other work-related things I need to do this week, so I will
>> likely report back on this over the weekend.  Thank you for the
>> explanation.  It makes perfect sense now.
>>
>> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax  wrote:
>>
>>> Some more context - the problem is that RenameFields outputs (in this
>>> case) Java Row objects that are inconsistent with the actual schema.
>>> For example if you have the following schema:
>>>
>>> Row {
>>>field1: Row {
>>>   field2: string
>>> }
>>> }
>>>
>>> And rename field1.field2 -> renamed, you'll get the following schema
>>>
>>> Row {
>>>   field1: Row {
>>>  renamed: string
>>>}
>>> }
>>>
>>> However the Java object for the _nested_ row will return the old schema
>>> if getSchema() is called on it. This is because we only update the schema
>>> on the top-level row.
>>>
>>> I think this explains why your test works in the direct runner. If the
>>> row ever goes through an encode/decode path, it will come back correct. The
>>> original incorrect Java objects are no longer around, and new (consistent)
>>> objects are constructed from the raw data and the PCollection schema.
>>> Dataflow tends to fuse ParDos together, so the following ParDo will see the
>>> incorrect Row object. I bet the DirectRunner is encoding and decoding in
>>> between, which fixes the object.
>>>
>>> You can validate this theory by forcing a shuffle after RenameFields
>>> using Reshufflle. It should fix the issue If it does, let me know and I'll
>>> work on a fix to RenameFields.
>>>
>>> On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax  wrote:
>>>
 Aha, yes this indeed another bug in the transform. The schema is set on
 the top-level Row but not on any nested rows.

 On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang 
 wrote:

> Thank you everyone for your input.  I believe it will be easiest to
> respond to all feedback in a single message rather than messages per 
> person.
>
>- NeedsRunner - The tests are run eventually, so obviously all
>good on my end.  I was trying to run the smallest subset of test cases
>possible and didn't venture beyond `gradle test`.
>- Stack Trace - There wasn't any unfortunately because no
>exception thrown in the code.  The Beam Row was translated into a BQ
>TableRow and an insertion was attempted.  The error "message" was part 
> of
>the response JSON that came back as a result of a request against the 
> BQ
>API.
>- Desired Behaviour - (field0_1.field1_0, nestedStringField) ->
>field0_1.nestedStringField is what I am looking for.
>- Info Logging Findings (In Lieu of a Stack Trace)
>   - The Beam Schema was as expected with all renames applied.
>   - The example I provided was heavily stripped down in order to
>   isolate the problem.  My work example which a bit impractical 
> because it's
>   part of some generic tooling has 4 levels of nesting and also 
> produces the
>   correct output too.
>   - BigQueryUtils.toTableRow(Row) returns the expected TableRow
>   in DirectRunner.  In DataflowRunner however, only the top-level 
> renames
>   were reflected in the TableRow and all renames in the nested fields 
> weren't.
>   - BigQueryUtils.toTableRow(Row) recurses on the Row values and
>   uses the Row.schema to get the field names.  This makes sense to 
> me, but if
>   a value is actually a Row then its schema appears to be 
> inconsistent with
>   the top-level schema
>- My Current Workaround - I forked RenameFields and replaced the
>attachValues in expand method to be a "deep" rename.  This is obviously
>inefficient and I will not be submitting a PR for that.
>- JIRA ticket - https://issues.apache.org/jira/browse/BEAM-12442
>
>
> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax  wrote:
>
>> This transform is the same across all runners. A few comments on the
>> test:
>>
>>   - Using attachValues directly is error prone (per the comment on
>> the method). I recommend using the withFieldValue builders instead.
>>   - I recommend 

unsuscribe

2021-06-02 Thread Pasan Kamburugamuwa
Please can you unsubscribe me.

Thank you
Pasan


Re: RenameFields behaves differently in DirectRunner

2021-06-02 Thread Brian Hulette
+dev 

> I bet the DirectRunner is encoding and decoding in between, which fixes
the object.

Do we need better testing of schema-aware (and potentially other built-in)
transforms in the face of fusion to root out issues like this?

Brian

On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang 
wrote:

> I have some other work-related things I need to do this week, so I will
> likely report back on this over the weekend.  Thank you for the
> explanation.  It makes perfect sense now.
>
> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax  wrote:
>
>> Some more context - the problem is that RenameFields outputs (in this
>> case) Java Row objects that are inconsistent with the actual schema.
>> For example if you have the following schema:
>>
>> Row {
>>field1: Row {
>>   field2: string
>> }
>> }
>>
>> And rename field1.field2 -> renamed, you'll get the following schema
>>
>> Row {
>>   field1: Row {
>>  renamed: string
>>}
>> }
>>
>> However the Java object for the _nested_ row will return the old schema
>> if getSchema() is called on it. This is because we only update the schema
>> on the top-level row.
>>
>> I think this explains why your test works in the direct runner. If the
>> row ever goes through an encode/decode path, it will come back correct. The
>> original incorrect Java objects are no longer around, and new (consistent)
>> objects are constructed from the raw data and the PCollection schema.
>> Dataflow tends to fuse ParDos together, so the following ParDo will see the
>> incorrect Row object. I bet the DirectRunner is encoding and decoding in
>> between, which fixes the object.
>>
>> You can validate this theory by forcing a shuffle after RenameFields
>> using Reshufflle. It should fix the issue If it does, let me know and I'll
>> work on a fix to RenameFields.
>>
>> On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax  wrote:
>>
>>> Aha, yes this indeed another bug in the transform. The schema is set on
>>> the top-level Row but not on any nested rows.
>>>
>>> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang 
>>> wrote:
>>>
 Thank you everyone for your input.  I believe it will be easiest to
 respond to all feedback in a single message rather than messages per 
 person.

- NeedsRunner - The tests are run eventually, so obviously all good
on my end.  I was trying to run the smallest subset of test cases 
 possible
and didn't venture beyond `gradle test`.
- Stack Trace - There wasn't any unfortunately because no exception
thrown in the code.  The Beam Row was translated into a BQ TableRow and 
 an
insertion was attempted.  The error "message" was part of the response 
 JSON
that came back as a result of a request against the BQ API.
- Desired Behaviour - (field0_1.field1_0, nestedStringField) ->
field0_1.nestedStringField is what I am looking for.
- Info Logging Findings (In Lieu of a Stack Trace)
   - The Beam Schema was as expected with all renames applied.
   - The example I provided was heavily stripped down in order to
   isolate the problem.  My work example which a bit impractical 
 because it's
   part of some generic tooling has 4 levels of nesting and also 
 produces the
   correct output too.
   - BigQueryUtils.toTableRow(Row) returns the expected TableRow in
   DirectRunner.  In DataflowRunner however, only the top-level renames 
 were
   reflected in the TableRow and all renames in the nested fields 
 weren't.
   - BigQueryUtils.toTableRow(Row) recurses on the Row values and
   uses the Row.schema to get the field names.  This makes sense to me, 
 but if
   a value is actually a Row then its schema appears to be inconsistent 
 with
   the top-level schema
- My Current Workaround - I forked RenameFields and replaced the
attachValues in expand method to be a "deep" rename.  This is obviously
inefficient and I will not be submitting a PR for that.
- JIRA ticket - https://issues.apache.org/jira/browse/BEAM-12442


 On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax  wrote:

> This transform is the same across all runners. A few comments on the
> test:
>
>   - Using attachValues directly is error prone (per the comment on the
> method). I recommend using the withFieldValue builders instead.
>   - I recommend capturing the RenameFields PCollection into a local
> variable of type PCollection and printing out the schema (which you
> can get using the PCollection.getSchema method) to ensure that the output
> schema looks like you expect.
>- RenameFields doesn't flatten. So renaming field0_1.field1_0 - >
> nestedStringField results in field0_1.nestedStringField; if you wanted to
> flatten, then the better transform would be
> Select.fieldNameAs("field0_1.field1_0", nestedStringField).

Re: RenameFields behaves differently in DirectRunner

2021-06-02 Thread Matthew Ouyang
I have some other work-related things I need to do this week, so I will
likely report back on this over the weekend.  Thank you for the
explanation.  It makes perfect sense now.

On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax  wrote:

> Some more context - the problem is that RenameFields outputs (in this
> case) Java Row objects that are inconsistent with the actual schema.
> For example if you have the following schema:
>
> Row {
>field1: Row {
>   field2: string
> }
> }
>
> And rename field1.field2 -> renamed, you'll get the following schema
>
> Row {
>   field1: Row {
>  renamed: string
>}
> }
>
> However the Java object for the _nested_ row will return the old schema if
> getSchema() is called on it. This is because we only update the schema on
> the top-level row.
>
> I think this explains why your test works in the direct runner. If the row
> ever goes through an encode/decode path, it will come back correct. The
> original incorrect Java objects are no longer around, and new (consistent)
> objects are constructed from the raw data and the PCollection schema.
> Dataflow tends to fuse ParDos together, so the following ParDo will see the
> incorrect Row object. I bet the DirectRunner is encoding and decoding in
> between, which fixes the object.
>
> You can validate this theory by forcing a shuffle after RenameFields using
> Reshufflle. It should fix the issue If it does, let me know and I'll work
> on a fix to RenameFields.
>
> On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax  wrote:
>
>> Aha, yes this indeed another bug in the transform. The schema is set on
>> the top-level Row but not on any nested rows.
>>
>> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang 
>> wrote:
>>
>>> Thank you everyone for your input.  I believe it will be easiest to
>>> respond to all feedback in a single message rather than messages per person.
>>>
>>>- NeedsRunner - The tests are run eventually, so obviously all good
>>>on my end.  I was trying to run the smallest subset of test cases 
>>> possible
>>>and didn't venture beyond `gradle test`.
>>>- Stack Trace - There wasn't any unfortunately because no exception
>>>thrown in the code.  The Beam Row was translated into a BQ TableRow and 
>>> an
>>>insertion was attempted.  The error "message" was part of the response 
>>> JSON
>>>that came back as a result of a request against the BQ API.
>>>- Desired Behaviour - (field0_1.field1_0, nestedStringField) ->
>>>field0_1.nestedStringField is what I am looking for.
>>>- Info Logging Findings (In Lieu of a Stack Trace)
>>>   - The Beam Schema was as expected with all renames applied.
>>>   - The example I provided was heavily stripped down in order to
>>>   isolate the problem.  My work example which a bit impractical because 
>>> it's
>>>   part of some generic tooling has 4 levels of nesting and also 
>>> produces the
>>>   correct output too.
>>>   - BigQueryUtils.toTableRow(Row) returns the expected TableRow in
>>>   DirectRunner.  In DataflowRunner however, only the top-level renames 
>>> were
>>>   reflected in the TableRow and all renames in the nested fields 
>>> weren't.
>>>   - BigQueryUtils.toTableRow(Row) recurses on the Row values and
>>>   uses the Row.schema to get the field names.  This makes sense to me, 
>>> but if
>>>   a value is actually a Row then its schema appears to be inconsistent 
>>> with
>>>   the top-level schema
>>>- My Current Workaround - I forked RenameFields and replaced the
>>>attachValues in expand method to be a "deep" rename.  This is obviously
>>>inefficient and I will not be submitting a PR for that.
>>>- JIRA ticket - https://issues.apache.org/jira/browse/BEAM-12442
>>>
>>>
>>> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax  wrote:
>>>
 This transform is the same across all runners. A few comments on the
 test:

   - Using attachValues directly is error prone (per the comment on the
 method). I recommend using the withFieldValue builders instead.
   - I recommend capturing the RenameFields PCollection into a local
 variable of type PCollection and printing out the schema (which you
 can get using the PCollection.getSchema method) to ensure that the output
 schema looks like you expect.
- RenameFields doesn't flatten. So renaming field0_1.field1_0 - >
 nestedStringField results in field0_1.nestedStringField; if you wanted to
 flatten, then the better transform would be
 Select.fieldNameAs("field0_1.field1_0", nestedStringField).

 This all being said, eyeballing the implementation of RenameFields
 makes me think that it is buggy in the case where you specify a top-level
 field multiple times like you do. I think it is simply adding the top-level
 field into the output schema multiple times, and the second time is with
 the field0_1 base name; I have no idea why your test doesn't catch this in
 the