Re: Resource Consumption increase With TupleTag

2020-08-19 Thread Talat Uyarer
Filter step is an independent step. We can think it is an etl step or
something else. MessageExtractor step writes messages on TupleTags based on
the kafka header. Yes, MessageExtractor is literally a multi-output DoFn
already. MessageExtractor is processing 48kps but branches are processing
their logs. Each Filter only consumes its log type. There is no any  So
That's why I assume it should consume the same amount of workers. But it
consumes more workers.



 |--->Filter1(20kps)-->WriteGCS
KafkaIO->MessageExtractor(48kps)-> |

 |--->Filter2(28kps)-->WriteGCS

Do you mean I can put my simple pipeline multiple times for all topics in
one dataflow job ? Is there any side effect having multiple independent DAG
on one DF job ? And also why the TupleTag model is not working properly?
Why is it using more resources than what it should be?

Thanks



On Wed, Aug 19, 2020 at 5:16 PM Robert Bradshaw  wrote:

> Just to clarify, previously you had.
>
> KafkaIO(topic1) --20kps--> Filter1 -> WriteGCS
> KafkaIO(topic2) --28kps--> Filter2 -> WriteGCS
>
> And now you have
>
>
>   ---48kps--> Filter1
> -> WriteGCS
>   /
> KafkaIO(topic1, topic2) + MessageExtractor
>\
>  ---48kps--> Filter2 ->
> WriteGCS
>
> Each filter is now actually consuming (and throwing away) more data than
> before.
>
> Or is MessageExtractor literally a multi-output DoFn already (which is
> why you're talking about TupleTags). This could possibly be more
> expensive if reading Kafak with headers is more expensive than reading
> it without.
>
> If topic1 and topic2 are truly independent, I would keep their reads
> separate. This will simplify your pipeline (and sounds like it'll
> improve performance). Note that you don't have to have a separate
> Dataflow job for each read, you can have a single Pipeline and do as
> many reads as you want and the'll all get executed in the same job.
>
> On Wed, Aug 19, 2020 at 4:14 PM Talat Uyarer
>  wrote:
> >
> > Hi Robert,
> >
> > I calculated process speed based on worker count. When I have separate
> jobs. topic1 job used 5 workers, topic2 job used 7 workers. Based on
> KafkaIO message count. they had 4kps processing speed per worker. After I
> combine them in one df job. That DF job started using ~18 workers, not 12
> workers.
> >
> > How can I understand if they are poorly fused or not ? I can not write
> Filter because it is a beamsql. I just want to simplified my DAG that's why
> i did not mentioned
> >
> > Thanks
> >
> > On Wed, Aug 19, 2020 at 3:54 PM Robert Bradshaw 
> wrote:
> >>
> >> Is this 2kps coming out of Filter1 + 2kps coming out of Filter2 (which
> >> would be 4kps total), or only 2kps coming out of KafkaIO and
> >> MessageExtractor?
> >>
> >> Though it /shouldn't/ matter, due to sibling fusion, there's a chance
> >> things are getting fused poorly and you could write Filter1 and
> >> Filter2 instead as a DoFn with multiple outputs (see
> >>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_programming-2Dguide_-23additional-2Doutputs&d=DwIFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=Erfg03JLKLNG3lT2ejqq7_fbvfL95-wSxZ5hFKqzyKU&s=JsWPJxBXopYYenfBAp6nkwfB0Q1Dhs1d4Yi41fBY3a8&e=
> ).
> >>
> >> - Robert
> >>
> >> On Wed, Aug 19, 2020 at 3:37 PM Talat Uyarer
> >>  wrote:
> >> >
> >> > Hi,
> >> >
> >> > I have a very simple DAG on my dataflow job.
> (KafkaIO->Filter->WriteGCS). When I submit this Dataflow job per topic it
> has 4kps per instance processing speed. However I want to consume two
> different topics in one DF job. I used TupleTag. I created TupleTags per
> message type. Each topic has different message types and also needs
> different filters. So my pipeline turned to below DAG. Message Extractor is
> a very simple step checking header of kafka messages and writing the
> correct TupleTag. However after starting to use this new DAG, dataflow
> canprocess 2kps per instance.
> >> >
> >> >
> |--->Filter1-->WriteGCS
> >> > KafkaIO->MessageExtractor-> |
> >> >
> |--->Filter2-->WriteGCS
> >> >
> >> > Do you have any idea why my data process speed decreased ?
> >> >
> >> > Thanks
>


Re: Resource Consumption increase With TupleTag

2020-08-19 Thread Talat Uyarer
Hi Robert,

I calculated process speed based on worker count. When I have
separate jobs. topic1 job used 5 workers, topic2 job used 7 workers. Based
on KafkaIO message count. they had 4kps processing speed per worker. After
I combine them in one df job. That DF job started using ~18 workers, not 12
workers.

How can I understand if they are poorly fused or not ? I can not write
Filter because it is a beamsql. I just want to simplified my DAG that's why
i did not mentioned

Thanks

On Wed, Aug 19, 2020 at 3:54 PM Robert Bradshaw  wrote:

> Is this 2kps coming out of Filter1 + 2kps coming out of Filter2 (which
> would be 4kps total), or only 2kps coming out of KafkaIO and
> MessageExtractor?
>
> Though it /shouldn't/ matter, due to sibling fusion, there's a chance
> things are getting fused poorly and you could write Filter1 and
> Filter2 instead as a DoFn with multiple outputs (see
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_programming-2Dguide_-23additional-2Doutputs&d=DwIFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=Erfg03JLKLNG3lT2ejqq7_fbvfL95-wSxZ5hFKqzyKU&s=JsWPJxBXopYYenfBAp6nkwfB0Q1Dhs1d4Yi41fBY3a8&e=
> ).
>
> - Robert
>
> On Wed, Aug 19, 2020 at 3:37 PM Talat Uyarer
>  wrote:
> >
> > Hi,
> >
> > I have a very simple DAG on my dataflow job.
> (KafkaIO->Filter->WriteGCS). When I submit this Dataflow job per topic it
> has 4kps per instance processing speed. However I want to consume two
> different topics in one DF job. I used TupleTag. I created TupleTags per
> message type. Each topic has different message types and also needs
> different filters. So my pipeline turned to below DAG. Message Extractor is
> a very simple step checking header of kafka messages and writing the
> correct TupleTag. However after starting to use this new DAG, dataflow
> canprocess 2kps per instance.
> >
> >  |--->Filter1-->WriteGCS
> > KafkaIO->MessageExtractor-> |
> >  |--->Filter2-->WriteGCS
> >
> > Do you have any idea why my data process speed decreased ?
> >
> > Thanks
>


Re: Resource Consumption increase With TupleTag

2020-08-19 Thread Robert Bradshaw
Is this 2kps coming out of Filter1 + 2kps coming out of Filter2 (which
would be 4kps total), or only 2kps coming out of KafkaIO and
MessageExtractor?

Though it /shouldn't/ matter, due to sibling fusion, there's a chance
things are getting fused poorly and you could write Filter1 and
Filter2 instead as a DoFn with multiple outputs (see
https://beam.apache.org/documentation/programming-guide/#additional-outputs).

- Robert

On Wed, Aug 19, 2020 at 3:37 PM Talat Uyarer
 wrote:
>
> Hi,
>
> I have a very simple DAG on my dataflow job. (KafkaIO->Filter->WriteGCS). 
> When I submit this Dataflow job per topic it has 4kps per instance processing 
> speed. However I want to consume two different topics in one DF job. I used 
> TupleTag. I created TupleTags per message type. Each topic has different 
> message types and also needs different filters. So my pipeline turned to 
> below DAG. Message Extractor is a very simple step checking header of kafka 
> messages and writing the correct TupleTag. However after starting to use this 
> new DAG, dataflow canprocess 2kps per instance.
>
>  |--->Filter1-->WriteGCS
> KafkaIO->MessageExtractor-> |
>  |--->Filter2-->WriteGCS
>
> Do you have any idea why my data process speed decreased ?
>
> Thanks


Resource Consumption increase With TupleTag

2020-08-19 Thread Talat Uyarer
Hi,

I have a very simple DAG on my dataflow job. (KafkaIO->Filter->WriteGCS).
When I submit this Dataflow job per topic it has 4kps per instance
processing speed. However I want to consume two different topics in one DF
job. I used TupleTag. I created TupleTags per message type. Each topic has
different message types and also needs different filters. So my pipeline
turned to below DAG. Message Extractor is a very simple step checking
header of kafka messages and writing the correct TupleTag. However after
starting to use this new DAG, dataflow canprocess 2kps per instance.

 |--->Filter1-->WriteGCS
KafkaIO->MessageExtractor-> |
 |--->Filter2-->WriteGCS

Do you have any idea why my data process speed decreased ?

Thanks


Re: [Java - Beam Schema] Manually Generating a Beam Schema for a POJO class

2020-08-19 Thread Brian Hulette
It looks like this is occurring because we don't actually support mixing
SchemaProviders in nested types. The current SchemaProvider implementations
only support nested types for homogenous types (e.g. an AutoValue with an
AutoValue field). So when you use JavaFieldSchema as the SchemaProvider for
the outer type (EnrichedArticle), it is also used recursively for the inner
type (ArticleEnvelope), rather than using the registered ProtoMessageSchema.

I filed BEAM-10765 [1] to add support for inferring schemas for
non-homogenous types, I think it's something we should be able to support.
I know it's been a while since you reported this, have you found a
workaround in the meantime? Your best bet may be to avoid using
ProtoMessageSchema for the inner class for now and use the same style of
class for the outer and inner class by just creating a POJO or AutoValue
that replicates the ArticleEnvelope class.


Luke: Regarding recursive schemas, Reuven and I have had some discussions
about it offline. I think he said it should be feasible but I don't know
much beyond that.

Brian

[1] https://issues.apache.org/jira/browse/BEAM-10765

On Tue, Jun 30, 2020 at 2:10 AM Kaymak, Tobias 
wrote:

> I want to make my example as simple as possible while also not leaving out
> the details that might be the reason for the error. I don't think there is
> any recursiveness.
> I can also share the ArticleEnvelope Protobuf file If that helps. I've
> tried to register the ArticleEnvelope schema like this:
>
> TestPipeline p = TestPipeline.create();
> TypeDescriptor
> articleEnvelopeTypeDescriptor =
> TypeDescriptor.of(ArticleProto.ArticleEnvelope.class);
> Schema articleSchema =
> new
> ProtoMessageSchema().schemaFor(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class));
>
> SerializableFunction
> articleEnvelopeToRow =
> new
> ProtoMessageSchema().toRowFunction(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class));
>
> SerializableFunction
> articleEnvelopeFromRow =
> new
> ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class));
>
>
> p.getSchemaRegistry().registerSchemaForClass(ArticleProto.ArticleEnvelope.class,
> articleSchema,articleEnvelopeToRow,articleEnvelopeFromRow);
>
> The problem is that even when I define and register it like above, as soon
> as I annotate the class EnrichedArticle with 
> @DefaultSchema(JavaFieldSchema.class)
> I get:
>
> Caused by: java.lang.IllegalAccessError: tried to access method
> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.()V
> from class
> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$gybLyTZO
> at
> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$gybLyTZO.create(Unknown
> Source)
> at
> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92)
> at
> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromValue(FromRowUsingCreator.java:110)
> at
> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:87)
> at
> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:62)
> at
> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45)
> at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120)
> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
> at
> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
> at
> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
> at
> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92)
> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
> at
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.(MutationDetectors.java:115)
> at
> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
> at
> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
> at
> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
> at
> ch.ricardo.di.beam.ArticlesKafkaToBigQuery$EnrichFn.processElement(ArticlesKafkaToBigQuery.java:414)
>
> So it does not seem to have an effect when the annotation on
> EnrichedArticle is present. Without the annotation however, there is no
> schema defined on the output PCollection, so I have to define it myself for

Re: Registering Protobuf schema

2020-08-19 Thread Luke Cwik
With providers there is also an ordering issue since multiple providers
could work for a given type so we want to apply them using some stable
ordering.

On Wed, Aug 19, 2020 at 10:08 AM Brian Hulette  wrote:

> Ah yes, the SchemaRegistry and SchemaProvider follow the same model, but
> none of the SchemaProviders are registered by default. Users can register
> the proto schema provider with
> registerSchemaProvider(Class) [1]:
>
>   p.getSchemaRegistry().registerSchemaProvider(ProtoMessageSchema.class);
>
> Then SchemaCoder should be used for all proto classes.
> We could use ServiceLoader to register all schema providers, then users
> wouldn't need to do this. I assume the reason we don't already is because
> schemas are still experimental and we want it to be opt-in.
>
> [1]
> https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/schemas/SchemaRegistry.html#registerSchemaProvider-org.apache.beam.sdk.schemas.SchemaProvider-
> 
>
> On Wed, Aug 19, 2020 at 8:44 AM Luke Cwik  wrote:
>
>> Brian, Coders have a provider model where the provider can be queried to
>> resolve for a given type and the providers are resolved in a specific
>> order. This gave the flexibility to handle situations like the one you
>> described.
>>
>> On Wed, Aug 19, 2020 at 12:30 AM 
>> wrote:
>>
>>> Hi Brian,
>>>
>>>
>>>
>>> Many thanks for your mail.
>>>
>>>
>>>
>>> Yes I figured that one out in the end from the docs, but many thanks for
>>> confirming.
>>>
>>>
>>>
>>> I did subsequently discover some other issues with protoBuf-derived
>>> schemas (essentially they don’t seem to be properly supported by
>>> BigQueryIO.Write or allow for optional fields) but I posted a separate
>>> message on the dev channel covering this.
>>>
>>>
>>>
>>> Kind regards,
>>>
>>>
>>>
>>> Rob
>>>
>>>
>>>
>>> *From:* Brian Hulette [mailto:bhule...@google.com]
>>> *Sent:* 18 August 2020 20:50
>>> *To:* user
>>> *Subject:* Re: Registering Protobuf schema
>>>
>>>
>>>
>>>
>>> *
>>> "This is an external email. Do you know who has sent it? Can you be sure
>>> that any links and attachments contained within it are safe? If in any
>>> doubt, use the Phishing Reporter Button in your Outlook client or forward
>>> the email as an attachment to ~ I've Been Phished"
>>> *
>>>
>>> Hi Robert,
>>> Sorry for the late reply on this. I think you should be able to do this
>>> by registering it in your pipeline's SchemaRegistry manually, like so:
>>>
>>>
>>>
>>>   Pipeline p;
>>>
>>>   p.getSchemaRegistry().registerSchemaProvider(Fx.class,
>>> ProtoMessageSchema.class);
>>>
>>> Of course this isn't quite as nice as just adding the DefualtSchema
>>> annotation to a class you control. Maybe we should consider some global
>>> config that would always use schemas for proto-generated classes.
>>>
>>>
>>> Brian
>>>
>>>
>>>
>>> On Sun, Jul 12, 2020 at 9:56 AM Kaymak, Tobias 
>>> wrote:
>>>
>>> This sounds like it is related to the problem I'm trying to solve. (In
>>> my case having a Java POJO containing a protobuf backed-class and trying to
>>> generate a Beam Schema from it.)
>>>
>>> I would be very interested to a solution to this as well :)
>>>
>>>
>>>
>>> On Tue, Jul 7, 2020 at 2:22 PM 
>>> wrote:
>>>
>>> Hi All,
>>>
>>>
>>>
>>> I have a BEAM pipeline where I am reading data from some parquet files
>>> and converting them into a different format based on protobuf generated
>>> classes.
>>>
>>>
>>>
>>> I wish to associate a schema (derived from the protobuf classes) for my
>>> PCollections.  What is the appropriate way to do this with
>>> protobuf-generated classes?
>>>
>>>
>>>
>>> Code excerpt:
>>>
>>>
>>>
>>> PCollection result = input.apply("FXFilePattern", FileIO.*match*
>>> ().filepattern(fxDataFilePattern))
>>> .apply("FXReadMatches", FileIO.*readMatches*())
>>> .apply("FXReadParquetFile", ParquetIO.*readFiles*(fxAvroSchema))
>>> .apply("MapFXToProto", ParDo.*of*(MapFXToProto.*of*()));
>>> boolean hasSchema = result.hasSchema();  // returns false
>>>
>>>
>>>
>>> With thanks in advance.
>>>
>>>
>>>
>>> Kind regards,
>>>
>>>
>>>
>>> Rob
>>>
>>>
>>>
>>> *Robert Butcher*
>>>
>>> *Technical Architect | Foundry/SRS | NatWest Markets*
>>>
>>> WeWork, 10 Devonshire Square, London, EC2M 4AE
>>>
>>> Mobile +44 (0) 7414 730866 <+44%207414%20730866>
>>>
>>>
>>>
>>> This email is classified as *CONFIDENTIAL* unless otherwise stated.
>>>
>>>
>>>
>>>
>>>
>>> This communication and any attachments are confidential and intended
>>> solely for the addressee. If you are not the intended recipient please
>>> advise us immediately and delete it. Unless specifically stated in the
>>> message or otherwise indicated, you may not duplicate, redistribute or
>>> forward this message and any attachments are not inten

Re: Registering Protobuf schema

2020-08-19 Thread Brian Hulette
Ah yes, the SchemaRegistry and SchemaProvider follow the same model, but
none of the SchemaProviders are registered by default. Users can register
the proto schema provider with
registerSchemaProvider(Class) [1]:

  p.getSchemaRegistry().registerSchemaProvider(ProtoMessageSchema.class);

Then SchemaCoder should be used for all proto classes.
We could use ServiceLoader to register all schema providers, then users
wouldn't need to do this. I assume the reason we don't already is because
schemas are still experimental and we want it to be opt-in.

[1]
https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/schemas/SchemaRegistry.html#registerSchemaProvider-org.apache.beam.sdk.schemas.SchemaProvider-


On Wed, Aug 19, 2020 at 8:44 AM Luke Cwik  wrote:

> Brian, Coders have a provider model where the provider can be queried to
> resolve for a given type and the providers are resolved in a specific
> order. This gave the flexibility to handle situations like the one you
> described.
>
> On Wed, Aug 19, 2020 at 12:30 AM 
> wrote:
>
>> Hi Brian,
>>
>>
>>
>> Many thanks for your mail.
>>
>>
>>
>> Yes I figured that one out in the end from the docs, but many thanks for
>> confirming.
>>
>>
>>
>> I did subsequently discover some other issues with protoBuf-derived
>> schemas (essentially they don’t seem to be properly supported by
>> BigQueryIO.Write or allow for optional fields) but I posted a separate
>> message on the dev channel covering this.
>>
>>
>>
>> Kind regards,
>>
>>
>>
>> Rob
>>
>>
>>
>> *From:* Brian Hulette [mailto:bhule...@google.com]
>> *Sent:* 18 August 2020 20:50
>> *To:* user
>> *Subject:* Re: Registering Protobuf schema
>>
>>
>>
>>
>> *
>> "This is an external email. Do you know who has sent it? Can you be sure
>> that any links and attachments contained within it are safe? If in any
>> doubt, use the Phishing Reporter Button in your Outlook client or forward
>> the email as an attachment to ~ I've Been Phished"
>> *
>>
>> Hi Robert,
>> Sorry for the late reply on this. I think you should be able to do this
>> by registering it in your pipeline's SchemaRegistry manually, like so:
>>
>>
>>
>>   Pipeline p;
>>
>>   p.getSchemaRegistry().registerSchemaProvider(Fx.class,
>> ProtoMessageSchema.class);
>>
>> Of course this isn't quite as nice as just adding the DefualtSchema
>> annotation to a class you control. Maybe we should consider some global
>> config that would always use schemas for proto-generated classes.
>>
>>
>> Brian
>>
>>
>>
>> On Sun, Jul 12, 2020 at 9:56 AM Kaymak, Tobias 
>> wrote:
>>
>> This sounds like it is related to the problem I'm trying to solve. (In my
>> case having a Java POJO containing a protobuf backed-class and trying to
>> generate a Beam Schema from it.)
>>
>> I would be very interested to a solution to this as well :)
>>
>>
>>
>> On Tue, Jul 7, 2020 at 2:22 PM  wrote:
>>
>> Hi All,
>>
>>
>>
>> I have a BEAM pipeline where I am reading data from some parquet files
>> and converting them into a different format based on protobuf generated
>> classes.
>>
>>
>>
>> I wish to associate a schema (derived from the protobuf classes) for my
>> PCollections.  What is the appropriate way to do this with
>> protobuf-generated classes?
>>
>>
>>
>> Code excerpt:
>>
>>
>>
>> PCollection result = input.apply("FXFilePattern", FileIO.*match*
>> ().filepattern(fxDataFilePattern))
>> .apply("FXReadMatches", FileIO.*readMatches*())
>> .apply("FXReadParquetFile", ParquetIO.*readFiles*(fxAvroSchema))
>> .apply("MapFXToProto", ParDo.*of*(MapFXToProto.*of*()));
>> boolean hasSchema = result.hasSchema();  // returns false
>>
>>
>>
>> With thanks in advance.
>>
>>
>>
>> Kind regards,
>>
>>
>>
>> Rob
>>
>>
>>
>> *Robert Butcher*
>>
>> *Technical Architect | Foundry/SRS | NatWest Markets*
>>
>> WeWork, 10 Devonshire Square, London, EC2M 4AE
>>
>> Mobile +44 (0) 7414 730866 <+44%207414%20730866>
>>
>>
>>
>> This email is classified as *CONFIDENTIAL* unless otherwise stated.
>>
>>
>>
>>
>>
>> This communication and any attachments are confidential and intended
>> solely for the addressee. If you are not the intended recipient please
>> advise us immediately and delete it. Unless specifically stated in the
>> message or otherwise indicated, you may not duplicate, redistribute or
>> forward this message and any attachments are not intended for distribution
>> to, or use by any person or entity in any jurisdiction or country where
>> such distribution or use would be contrary to local law or regulation.
>> NatWest Markets Plc  or any affiliated entity ("NatWest Markets") accepts
>> no responsibility for any changes made to this message after it was sent.
>>
>> Unless otherwise specifically indicated, the contents of this

Re: Registering Protobuf schema

2020-08-19 Thread Luke Cwik
Brian, Coders have a provider model where the provider can be queried to
resolve for a given type and the providers are resolved in a specific
order. This gave the flexibility to handle situations like the one you
described.

On Wed, Aug 19, 2020 at 12:30 AM  wrote:

> Hi Brian,
>
>
>
> Many thanks for your mail.
>
>
>
> Yes I figured that one out in the end from the docs, but many thanks for
> confirming.
>
>
>
> I did subsequently discover some other issues with protoBuf-derived
> schemas (essentially they don’t seem to be properly supported by
> BigQueryIO.Write or allow for optional fields) but I posted a separate
> message on the dev channel covering this.
>
>
>
> Kind regards,
>
>
>
> Rob
>
>
>
> *From:* Brian Hulette [mailto:bhule...@google.com]
> *Sent:* 18 August 2020 20:50
> *To:* user
> *Subject:* Re: Registering Protobuf schema
>
>
>
>
> *
> "This is an external email. Do you know who has sent it? Can you be sure
> that any links and attachments contained within it are safe? If in any
> doubt, use the Phishing Reporter Button in your Outlook client or forward
> the email as an attachment to ~ I've Been Phished"
> *
>
> Hi Robert,
> Sorry for the late reply on this. I think you should be able to do this by
> registering it in your pipeline's SchemaRegistry manually, like so:
>
>
>
>   Pipeline p;
>
>   p.getSchemaRegistry().registerSchemaProvider(Fx.class,
> ProtoMessageSchema.class);
>
> Of course this isn't quite as nice as just adding the DefualtSchema
> annotation to a class you control. Maybe we should consider some global
> config that would always use schemas for proto-generated classes.
>
>
> Brian
>
>
>
> On Sun, Jul 12, 2020 at 9:56 AM Kaymak, Tobias 
> wrote:
>
> This sounds like it is related to the problem I'm trying to solve. (In my
> case having a Java POJO containing a protobuf backed-class and trying to
> generate a Beam Schema from it.)
>
> I would be very interested to a solution to this as well :)
>
>
>
> On Tue, Jul 7, 2020 at 2:22 PM  wrote:
>
> Hi All,
>
>
>
> I have a BEAM pipeline where I am reading data from some parquet files and
> converting them into a different format based on protobuf generated classes.
>
>
>
> I wish to associate a schema (derived from the protobuf classes) for my
> PCollections.  What is the appropriate way to do this with
> protobuf-generated classes?
>
>
>
> Code excerpt:
>
>
>
> PCollection result = input.apply("FXFilePattern", FileIO.*match*
> ().filepattern(fxDataFilePattern))
> .apply("FXReadMatches", FileIO.*readMatches*())
> .apply("FXReadParquetFile", ParquetIO.*readFiles*(fxAvroSchema))
> .apply("MapFXToProto", ParDo.*of*(MapFXToProto.*of*()));
> boolean hasSchema = result.hasSchema();  // returns false
>
>
>
> With thanks in advance.
>
>
>
> Kind regards,
>
>
>
> Rob
>
>
>
> *Robert Butcher*
>
> *Technical Architect | Foundry/SRS | NatWest Markets*
>
> WeWork, 10 Devonshire Square, London, EC2M 4AE
>
> Mobile +44 (0) 7414 730866 <+44%207414%20730866>
>
>
>
> This email is classified as *CONFIDENTIAL* unless otherwise stated.
>
>
>
>
>
> This communication and any attachments are confidential and intended
> solely for the addressee. If you are not the intended recipient please
> advise us immediately and delete it. Unless specifically stated in the
> message or otherwise indicated, you may not duplicate, redistribute or
> forward this message and any attachments are not intended for distribution
> to, or use by any person or entity in any jurisdiction or country where
> such distribution or use would be contrary to local law or regulation.
> NatWest Markets Plc  or any affiliated entity ("NatWest Markets") accepts
> no responsibility for any changes made to this message after it was sent.
>
> Unless otherwise specifically indicated, the contents of this
> communication and its attachments are for information purposes only and
> should not be regarded as an offer or solicitation to buy or sell a product
> or service, confirmation of any transaction, a valuation, indicative price
> or an official statement. Trading desks may have a position or interest
> that is inconsistent with any views expressed in this message. In
> evaluating the information contained in this message, you should know that
> it could have been previously provided to other clients and/or internal
> NatWest Markets personnel, who could have already acted on it.
>
> NatWest Markets cannot provide absolute assurances that all electronic
> communications (sent or received) are secure, error free, not corrupted,
> incomplete or virus free and/or that they will not be lost, mis-delivered,
> destroyed, delayed or intercepted/decrypted by others. Therefore NatWest
> Markets disclaims all liability with regards to electronic communications
> (and the contents therein) if they are corrupted, lost destroyed, delayed,
> incomplete, mis-delivered, intercepted, decrypted 

RE: Registering Protobuf schema

2020-08-19 Thread Robert.Butcher
Hi Brian,

Many thanks for your mail.

Yes I figured that one out in the end from the docs, but many thanks for 
confirming.

I did subsequently discover some other issues with protoBuf-derived schemas 
(essentially they don’t seem to be properly supported by BigQueryIO.Write or 
allow for optional fields) but I posted a separate message on the dev channel 
covering this.

Kind regards,

Rob

From: Brian Hulette [mailto:bhule...@google.com]
Sent: 18 August 2020 20:50
To: user
Subject: Re: Registering Protobuf schema


*
"This is an external email. Do you know who has sent it? Can you be sure that 
any links and attachments contained within it are safe? If in any doubt, use 
the Phishing Reporter Button in your Outlook client or forward the email as an 
attachment to ~ I've Been Phished"
*
Hi Robert,
Sorry for the late reply on this. I think you should be able to do this by 
registering it in your pipeline's SchemaRegistry manually, like so:

  Pipeline p;
  p.getSchemaRegistry().registerSchemaProvider(Fx.class, 
ProtoMessageSchema.class);

Of course this isn't quite as nice as just adding the DefualtSchema annotation 
to a class you control. Maybe we should consider some global config that would 
always use schemas for proto-generated classes.

Brian

On Sun, Jul 12, 2020 at 9:56 AM Kaymak, Tobias 
mailto:tobias.kay...@ricardo.ch>> wrote:
This sounds like it is related to the problem I'm trying to solve. (In my case 
having a Java POJO containing a protobuf backed-class and trying to generate a 
Beam Schema from it.)

I would be very interested to a solution to this as well :)

On Tue, Jul 7, 2020 at 2:22 PM 
mailto:robert.butc...@natwestmarkets.com>> 
wrote:
Hi All,

I have a BEAM pipeline where I am reading data from some parquet files and 
converting them into a different format based on protobuf generated classes.

I wish to associate a schema (derived from the protobuf classes) for my 
PCollections.  What is the appropriate way to do this with protobuf-generated 
classes?

Code excerpt:

PCollection result = input.apply("FXFilePattern", 
FileIO.match().filepattern(fxDataFilePattern))
.apply("FXReadMatches", FileIO.readMatches())
.apply("FXReadParquetFile", ParquetIO.readFiles(fxAvroSchema))
.apply("MapFXToProto", ParDo.of(MapFXToProto.of()));
boolean hasSchema = result.hasSchema();  // returns false

With thanks in advance.

Kind regards,

Rob

Robert Butcher
Technical Architect | Foundry/SRS | NatWest Markets
WeWork, 10 Devonshire Square, London, EC2M 4AE
Mobile +44 (0) 7414 730866

This email is classified as CONFIDENTIAL unless otherwise stated.


This communication and any attachments are confidential and intended solely for 
the addressee. If you are not the intended recipient please advise us 
immediately and delete it. Unless specifically stated in the message or 
otherwise indicated, you may not duplicate, redistribute or forward this 
message and any attachments are not intended for distribution to, or use by any 
person or entity in any jurisdiction or country where such distribution or use 
would be contrary to local law or regulation. NatWest Markets Plc  or any 
affiliated entity ("NatWest Markets") accepts no responsibility for any changes 
made to this message after it was sent.
Unless otherwise specifically indicated, the contents of this communication and 
its attachments are for information purposes only and should not be regarded as 
an offer or solicitation to buy or sell a product or service, confirmation of 
any transaction, a valuation, indicative price or an official statement. 
Trading desks may have a position or interest that is inconsistent with any 
views expressed in this message. In evaluating the information contained in 
this message, you should know that it could have been previously provided to 
other clients and/or internal NatWest Markets personnel, who could have already 
acted on it.
NatWest Markets cannot provide absolute assurances that all electronic 
communications (sent or received) are secure, error free, not corrupted, 
incomplete or virus free and/or that they will not be lost, mis-delivered, 
destroyed, delayed or intercepted/decrypted by others. Therefore NatWest 
Markets disclaims all liability with regards to electronic communications (and 
the contents therein) if they are corrupted, lost destroyed, delayed, 
incomplete, mis-delivered, intercepted, decrypted or otherwise misappropriated 
by others.
Any electronic communication that is conducted within or through NatWest 
Markets systems will be subject to being archived, monitored and produced to 
regulators and in litigation in accordance with NatWest Markets’ policy and 
local laws, rules and regulations. Unless expressly prohibited by local law, 
electronic communications may be archived in countries other than the country 
in which you are located, and may be treated in accordance with