Re: [DISCUSS] ReadAll pattern and consistent use in IO connectors

2020-06-24 Thread Boyuan Zhang
Sorry for the typo. I mean I think we can go with *(3)* and (4): use the
data type that is schema-aware as the input of ReadAll.

On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang  wrote:

> Thanks for the summary, Cham!
>
> I think we can go with (2) and (4): use the data type that is schema-aware
> as the input of ReadAll.
>
> Converting Read into ReadAll helps us to stick with SDF-like IO. But only
> having  (3) is not enough to solve the problem of using ReadAll in x-lang
> case.
>
> The key point of ReadAll is that the input type of ReadAll should be able
> to cross language boundaries and have compatibilities of
> updating/downgrading. After investigating some possibilities(pure java pojo
> with custom coder, protobuf, row/schema) in Kafka usage, we find that
> row/schema fits our needs most. Here comes (4). I believe that using Read
> as input of ReadAll makes sense in some cases, but I also think not all IOs
> have the same need. I would treat Read as a special type as long as the
> Read is schema-aware.
>
> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath 
> wrote:
>
>> I see. So it seems like there are three options discussed so far when it
>> comes to defining source descriptors for ReadAll type transforms
>>
>> (1) Use Read PTransform as the element type of the input PCollection
>> (2) Use a POJO that describes the source as the data element of the input
>> PCollection
>> (3) Provide a converter as a function to the Read transform which
>> essentially will convert it to a ReadAll (what Eugene mentioned)
>>
>> I feel like (3) is more suitable for a related set of source descriptions
>> such as files.
>> (1) will allow most code-reuse but seems like will make it hard to use
>> the ReadAll transform as a cross-language transform and will break the
>> separation of construction time and runtime constructs
>> (2) could result to less code reuse if not careful but will make the
>> transform easier to be used as a cross-language transform without
>> additional modifications
>>
>> Also, with SDF, we can create ReadAll-like transforms that are more
>> efficient. So we might be able to just define all sources in that format
>> and make Read transforms just an easy to use composite built on top of that
>> (by adding a preceding Create transform).
>>
>> Thanks,
>> Cham
>>
>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik  wrote:
>>
>>> I believe we do require PTransforms to be serializable since anonymous
>>> DoFns typically capture the enclosing PTransform.
>>>
>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath <
>>> chamik...@google.com> wrote:
>>>
 Seems like Read in PCollection refers to a transform, at least
 here:
 https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353

 I'm in favour of separating construction time transforms from execution
 time data objects that we store in PCollections as Luke mentioned. Also, we
 don't guarantee that PTransform is serializable so users have the
 additional complexity of providing a corder whenever a PTransform is used
 as a data object.
 Also, agree with Boyuan that using simple Java objects that are
 convertible to Beam Rows allow us to make these transforms available to
 other SDKs through the cross-language transforms. Using transforms or
 complex sources as data objects will probably make this difficult.

 Thanks,
 Cham



 On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang 
 wrote:

> Hi Ismael,
>
> I think the ReadAll in the IO connector refers to the IO with SDF
> implementation despite the type of input, where Read refers to
> UnboundedSource.  One major pushback of using KafkaIO.Read as source
> description is that not all configurations of KafkaIO.Read are meaningful
> to populate during execution time. Also when thinking about x-lang useage,
> making source description across language boundaries is also necessary.  
> As
> Luke mentioned, it's quite easy to infer a Schema from an AutoValue 
> object:
> KafkaSourceDescription.java
> .
> Then the coder of this schema-aware object will be a SchemaCoder
> .
> When crossing language boundaries, it's also easy to convert a Row into 
> the
> source description: Convert.fromRows
> 
> .
>
>
> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik  wrote:
>
>> To provide additional context, the KafkaIO ReadAll transform takes a
>> PCollection. This KafkaSourceDescriptor is a POJO

Re: [DISCUSS] ReadAll pattern and consistent use in IO connectors

2020-06-24 Thread Boyuan Zhang
Thanks for the summary, Cham!

I think we can go with (2) and (4): use the data type that is schema-aware
as the input of ReadAll.

Converting Read into ReadAll helps us to stick with SDF-like IO. But only
having  (3) is not enough to solve the problem of using ReadAll in x-lang
case.

The key point of ReadAll is that the input type of ReadAll should be able
to cross language boundaries and have compatibilities of
updating/downgrading. After investigating some possibilities(pure java pojo
with custom coder, protobuf, row/schema) in Kafka usage, we find that
row/schema fits our needs most. Here comes (4). I believe that using Read
as input of ReadAll makes sense in some cases, but I also think not all IOs
have the same need. I would treat Read as a special type as long as the
Read is schema-aware.

On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath 
wrote:

> I see. So it seems like there are three options discussed so far when it
> comes to defining source descriptors for ReadAll type transforms
>
> (1) Use Read PTransform as the element type of the input PCollection
> (2) Use a POJO that describes the source as the data element of the input
> PCollection
> (3) Provide a converter as a function to the Read transform which
> essentially will convert it to a ReadAll (what Eugene mentioned)
>
> I feel like (3) is more suitable for a related set of source descriptions
> such as files.
> (1) will allow most code-reuse but seems like will make it hard to use the
> ReadAll transform as a cross-language transform and will break the
> separation of construction time and runtime constructs
> (2) could result to less code reuse if not careful but will make the
> transform easier to be used as a cross-language transform without
> additional modifications
>
> Also, with SDF, we can create ReadAll-like transforms that are more
> efficient. So we might be able to just define all sources in that format
> and make Read transforms just an easy to use composite built on top of that
> (by adding a preceding Create transform).
>
> Thanks,
> Cham
>
> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik  wrote:
>
>> I believe we do require PTransforms to be serializable since anonymous
>> DoFns typically capture the enclosing PTransform.
>>
>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath 
>> wrote:
>>
>>> Seems like Read in PCollection refers to a transform, at least
>>> here:
>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>>>
>>> I'm in favour of separating construction time transforms from execution
>>> time data objects that we store in PCollections as Luke mentioned. Also, we
>>> don't guarantee that PTransform is serializable so users have the
>>> additional complexity of providing a corder whenever a PTransform is used
>>> as a data object.
>>> Also, agree with Boyuan that using simple Java objects that are
>>> convertible to Beam Rows allow us to make these transforms available to
>>> other SDKs through the cross-language transforms. Using transforms or
>>> complex sources as data objects will probably make this difficult.
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>
>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang 
>>> wrote:
>>>
 Hi Ismael,

 I think the ReadAll in the IO connector refers to the IO with SDF
 implementation despite the type of input, where Read refers to
 UnboundedSource.  One major pushback of using KafkaIO.Read as source
 description is that not all configurations of KafkaIO.Read are meaningful
 to populate during execution time. Also when thinking about x-lang useage,
 making source description across language boundaries is also necessary.  As
 Luke mentioned, it's quite easy to infer a Schema from an AutoValue object:
 KafkaSourceDescription.java
 .
 Then the coder of this schema-aware object will be a SchemaCoder
 .
 When crossing language boundaries, it's also easy to convert a Row into the
 source description: Convert.fromRows
 
 .


 On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik  wrote:

> To provide additional context, the KafkaIO ReadAll transform takes a
> PCollection. This KafkaSourceDescriptor is a POJO
> that contains the configurable parameters for reading from Kafka. This is
> different from the pattern that Ismael listed because they take
> PCollection as input and the Read is the same as the Read PTransform
> class used for the non read all case.
>
> The KafkaSourceDescriptor does lead to duplication since param

Re: [DISCUSS] ReadAll pattern and consistent use in IO connectors

2020-06-24 Thread Chamikara Jayalath
I see. So it seems like there are three options discussed so far when it
comes to defining source descriptors for ReadAll type transforms

(1) Use Read PTransform as the element type of the input PCollection
(2) Use a POJO that describes the source as the data element of the input
PCollection
(3) Provide a converter as a function to the Read transform which
essentially will convert it to a ReadAll (what Eugene mentioned)

I feel like (3) is more suitable for a related set of source descriptions
such as files.
(1) will allow most code-reuse but seems like will make it hard to use the
ReadAll transform as a cross-language transform and will break the
separation of construction time and runtime constructs
(2) could result to less code reuse if not careful but will make the
transform easier to be used as a cross-language transform without
additional modifications

Also, with SDF, we can create ReadAll-like transforms that are more
efficient. So we might be able to just define all sources in that format
and make Read transforms just an easy to use composite built on top of that
(by adding a preceding Create transform).

Thanks,
Cham

On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik  wrote:

> I believe we do require PTransforms to be serializable since anonymous
> DoFns typically capture the enclosing PTransform.
>
> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath 
> wrote:
>
>> Seems like Read in PCollection refers to a transform, at least
>> here:
>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>>
>> I'm in favour of separating construction time transforms from execution
>> time data objects that we store in PCollections as Luke mentioned. Also, we
>> don't guarantee that PTransform is serializable so users have the
>> additional complexity of providing a corder whenever a PTransform is used
>> as a data object.
>> Also, agree with Boyuan that using simple Java objects that are
>> convertible to Beam Rows allow us to make these transforms available to
>> other SDKs through the cross-language transforms. Using transforms or
>> complex sources as data objects will probably make this difficult.
>>
>> Thanks,
>> Cham
>>
>>
>>
>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang  wrote:
>>
>>> Hi Ismael,
>>>
>>> I think the ReadAll in the IO connector refers to the IO with SDF
>>> implementation despite the type of input, where Read refers to
>>> UnboundedSource.  One major pushback of using KafkaIO.Read as source
>>> description is that not all configurations of KafkaIO.Read are meaningful
>>> to populate during execution time. Also when thinking about x-lang useage,
>>> making source description across language boundaries is also necessary.  As
>>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue object:
>>> KafkaSourceDescription.java
>>> .
>>> Then the coder of this schema-aware object will be a SchemaCoder
>>> .
>>> When crossing language boundaries, it's also easy to convert a Row into the
>>> source description: Convert.fromRows
>>> 
>>> .
>>>
>>>
>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik  wrote:
>>>
 To provide additional context, the KafkaIO ReadAll transform takes a
 PCollection. This KafkaSourceDescriptor is a POJO
 that contains the configurable parameters for reading from Kafka. This is
 different from the pattern that Ismael listed because they take
 PCollection as input and the Read is the same as the Read PTransform
 class used for the non read all case.

 The KafkaSourceDescriptor does lead to duplication since parameters
 used to configure the transform have to be copied over to the source
 descriptor but decouples how a transform is specified from the object that
 describes what needs to be done. I believe Ismael's point is that we
 wouldn't need such a decoupling.

 Another area that hasn't been discussed and I believe is a non-issue is
 that the Beam Java SDK has the most IO connectors and we would want to use
 the IO implementations within Beam Go and Beam Python. This brings in its
 own set of issues related to versioning and compatibility for the wire
 format and how one parameterizes such transforms. The wire format issue can
 be solved with either approach by making sure that the cross language
 expansion always takes the well known format (whatever it may be) and
 converts it into Read/KafkaSourceDescriptor/... object that is then passed
 to the ReadAll transform. Boyuan has been looking to make the
 KafkaSo

Re: Apache Beam ZeroMQ connector

2020-06-24 Thread Luke Cwik
I'm not aware of any ZeroMQ connector implementations that are part of
Apache Beam.

On Wed, Jun 24, 2020 at 11:44 AM Sherif A. Kozman <
sherif.koz...@extremesolution.com> wrote:

> Hello,
>
> We were in the process of planning a deployment of exporting stream data
> from Aruba Networks Analytics engine through Apache beam and it turns out
> that it utilizes ZeroMQ for messaging.
> We couldn't find any ZeroMQ connectors and were wondering if it does exist
> or it would be compatible with other connectors for Apache Beam.
>
> Thanks
> Sherif Kozman
>
> 
>
> Sherif Kozman
>
> *Extreme Solution*
>
> A: 17875 Von Karman Ave suite 150, Irvine, CA, 92614
>
> P: +1 714 719 2237 <+1-714-719-2237>M: +20 122 2426241
> <+20-122-2426241>
>
> E: she...@extremesolution.comW: www.extremesolution.com
>
> 
>
> 
>
> 
>
> 
>
> This email and any files transmitted with it are confidential and intended
> solely for the use of the individual or entity to whom they are addressed.
> If you have received this email in error please notify the system manager.
> This message contains confidential information and is intended only for the
> individual named. If you are not the named addressee you should not
> disseminate, distribute or copy this e-mail. Please notify the sender
> immediately by e-mail if you have received this e-mail by mistake and
> delete this e-mail from your system. If you are not the intended recipient
> you are notified that disclosing, copying, distributing or taking any
> action in reliance on the contents of this information is strictly
> prohibited.
>
> ↪ Schedule A Meeting With Me!
> 
>
> [image:
> https://s3.amazonaws.com/accredible_api_mails/embed_badge/12225976.png]
> 
>
>
>


Apache Beam ZeroMQ connector

2020-06-24 Thread Sherif A. Kozman
Hello,

We were in the process of planning a deployment of exporting stream data
from Aruba Networks Analytics engine through Apache beam and it turns out
that it utilizes ZeroMQ for messaging.
We couldn't find any ZeroMQ connectors and were wondering if it does exist
or it would be compatible with other connectors for Apache Beam.

Thanks
Sherif Kozman



Sherif Kozman

*Extreme Solution*

A: 17875 Von Karman Ave suite 150, Irvine, CA, 92614

P: +1 714 719 2237 <+1-714-719-2237>M: +20 122 2426241 <+20-122-2426241>

E: she...@extremesolution.comW: www.extremesolution.com









This email and any files transmitted with it are confidential and intended
solely for the use of the individual or entity to whom they are addressed.
If you have received this email in error please notify the system manager.
This message contains confidential information and is intended only for the
individual named. If you are not the named addressee you should not
disseminate, distribute or copy this e-mail. Please notify the sender
immediately by e-mail if you have received this e-mail by mistake and
delete this e-mail from your system. If you are not the intended recipient
you are notified that disclosing, copying, distributing or taking any
action in reliance on the contents of this information is strictly
prohibited.

↪ Schedule A Meeting With Me!


[image:
https://s3.amazonaws.com/accredible_api_mails/embed_badge/12225976.png]



Re: [DISCUSS] ReadAll pattern and consistent use in IO connectors

2020-06-24 Thread Luke Cwik
I believe we do require PTransforms to be serializable since anonymous
DoFns typically capture the enclosing PTransform.

On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath 
wrote:

> Seems like Read in PCollection refers to a transform, at least here:
> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>
> I'm in favour of separating construction time transforms from execution
> time data objects that we store in PCollections as Luke mentioned. Also, we
> don't guarantee that PTransform is serializable so users have the
> additional complexity of providing a corder whenever a PTransform is used
> as a data object.
> Also, agree with Boyuan that using simple Java objects that are
> convertible to Beam Rows allow us to make these transforms available to
> other SDKs through the cross-language transforms. Using transforms or
> complex sources as data objects will probably make this difficult.
>
> Thanks,
> Cham
>
>
>
> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang  wrote:
>
>> Hi Ismael,
>>
>> I think the ReadAll in the IO connector refers to the IO with SDF
>> implementation despite the type of input, where Read refers to
>> UnboundedSource.  One major pushback of using KafkaIO.Read as source
>> description is that not all configurations of KafkaIO.Read are meaningful
>> to populate during execution time. Also when thinking about x-lang useage,
>> making source description across language boundaries is also necessary.  As
>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue object:
>> KafkaSourceDescription.java
>> .
>> Then the coder of this schema-aware object will be a SchemaCoder
>> .
>> When crossing language boundaries, it's also easy to convert a Row into the
>> source description: Convert.fromRows
>> 
>> .
>>
>>
>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik  wrote:
>>
>>> To provide additional context, the KafkaIO ReadAll transform takes a
>>> PCollection. This KafkaSourceDescriptor is a POJO
>>> that contains the configurable parameters for reading from Kafka. This is
>>> different from the pattern that Ismael listed because they take
>>> PCollection as input and the Read is the same as the Read PTransform
>>> class used for the non read all case.
>>>
>>> The KafkaSourceDescriptor does lead to duplication since parameters used
>>> to configure the transform have to be copied over to the source descriptor
>>> but decouples how a transform is specified from the object that describes
>>> what needs to be done. I believe Ismael's point is that we wouldn't need
>>> such a decoupling.
>>>
>>> Another area that hasn't been discussed and I believe is a non-issue is
>>> that the Beam Java SDK has the most IO connectors and we would want to use
>>> the IO implementations within Beam Go and Beam Python. This brings in its
>>> own set of issues related to versioning and compatibility for the wire
>>> format and how one parameterizes such transforms. The wire format issue can
>>> be solved with either approach by making sure that the cross language
>>> expansion always takes the well known format (whatever it may be) and
>>> converts it into Read/KafkaSourceDescriptor/... object that is then passed
>>> to the ReadAll transform. Boyuan has been looking to make the
>>> KafkaSourceDescriptor have a schema so it can be represented as a row and
>>> this can be done easily using the AutoValue integration (I don't believe
>>> there is anything preventing someone from writing a schema row -> Read ->
>>> row adapter or also using the AutoValue configuration if the transform is
>>> also an AutoValue).
>>>
>>> I would be more for the code duplication and separation of concerns
>>> provided by using a different object to represent the contents of the
>>> PCollection from the pipeline construction time PTransform.
>>>
>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov 
>>> wrote:
>>>
 Hi Ismael,

 Thanks for taking this on. Have you considered an approach similar (or
 dual) to FileIO.write(), where we in a sense also have to configure a
 dynamic number different IO transforms of the same type (file writes)?

 E.g. how in this example we configure many aspects of many file writes:

 transactions.apply(FileIO.writeDynamic()
  .by(Transaction::getType)
  .via(tx -> tx.getType().toFields(tx),  // Convert the data to be
 written to CSVSink
   type -> new CSVSink(type.getFieldNames()))
  .to(".../path/to/")
  .withNaming(type -> defaultNaming(type + "-transact

Re: [DISCUSS] ReadAll pattern and consistent use in IO connectors

2020-06-24 Thread Chamikara Jayalath
Seems like Read in PCollection refers to a transform, at least here:
https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353

I'm in favour of separating construction time transforms from execution
time data objects that we store in PCollections as Luke mentioned. Also, we
don't guarantee that PTransform is serializable so users have the
additional complexity of providing a corder whenever a PTransform is used
as a data object.
Also, agree with Boyuan that using simple Java objects that are
convertible to Beam Rows allow us to make these transforms available to
other SDKs through the cross-language transforms. Using transforms or
complex sources as data objects will probably make this difficult.

Thanks,
Cham



On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang  wrote:

> Hi Ismael,
>
> I think the ReadAll in the IO connector refers to the IO with SDF
> implementation despite the type of input, where Read refers to
> UnboundedSource.  One major pushback of using KafkaIO.Read as source
> description is that not all configurations of KafkaIO.Read are meaningful
> to populate during execution time. Also when thinking about x-lang useage,
> making source description across language boundaries is also necessary.  As
> Luke mentioned, it's quite easy to infer a Schema from an AutoValue object:
> KafkaSourceDescription.java
> .
> Then the coder of this schema-aware object will be a SchemaCoder
> .
> When crossing language boundaries, it's also easy to convert a Row into the
> source description: Convert.fromRows
> 
> .
>
>
> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik  wrote:
>
>> To provide additional context, the KafkaIO ReadAll transform takes a
>> PCollection. This KafkaSourceDescriptor is a POJO
>> that contains the configurable parameters for reading from Kafka. This is
>> different from the pattern that Ismael listed because they take
>> PCollection as input and the Read is the same as the Read PTransform
>> class used for the non read all case.
>>
>> The KafkaSourceDescriptor does lead to duplication since parameters used
>> to configure the transform have to be copied over to the source descriptor
>> but decouples how a transform is specified from the object that describes
>> what needs to be done. I believe Ismael's point is that we wouldn't need
>> such a decoupling.
>>
>> Another area that hasn't been discussed and I believe is a non-issue is
>> that the Beam Java SDK has the most IO connectors and we would want to use
>> the IO implementations within Beam Go and Beam Python. This brings in its
>> own set of issues related to versioning and compatibility for the wire
>> format and how one parameterizes such transforms. The wire format issue can
>> be solved with either approach by making sure that the cross language
>> expansion always takes the well known format (whatever it may be) and
>> converts it into Read/KafkaSourceDescriptor/... object that is then passed
>> to the ReadAll transform. Boyuan has been looking to make the
>> KafkaSourceDescriptor have a schema so it can be represented as a row and
>> this can be done easily using the AutoValue integration (I don't believe
>> there is anything preventing someone from writing a schema row -> Read ->
>> row adapter or also using the AutoValue configuration if the transform is
>> also an AutoValue).
>>
>> I would be more for the code duplication and separation of concerns
>> provided by using a different object to represent the contents of the
>> PCollection from the pipeline construction time PTransform.
>>
>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov 
>> wrote:
>>
>>> Hi Ismael,
>>>
>>> Thanks for taking this on. Have you considered an approach similar (or
>>> dual) to FileIO.write(), where we in a sense also have to configure a
>>> dynamic number different IO transforms of the same type (file writes)?
>>>
>>> E.g. how in this example we configure many aspects of many file writes:
>>>
>>> transactions.apply(FileIO.writeDynamic()
>>>  .by(Transaction::getType)
>>>  .via(tx -> tx.getType().toFields(tx),  // Convert the data to be
>>> written to CSVSink
>>>   type -> new CSVSink(type.getFieldNames()))
>>>  .to(".../path/to/")
>>>  .withNaming(type -> defaultNaming(type + "-transactions", ".csv"));
>>>
>>> we could do something similar for many JdbcIO reads:
>>>
>>> PCollection bars;  // user-specific type from which all the read
>>> parameters can be inferred
>>> PCollection moos = bars.apply(JdbcIO.readAll()
>>>   .fromQuery(bar -> ...compute query for this bar..

Re: [DISCUSS] ReadAll pattern and consistent use in IO connectors

2020-06-24 Thread Boyuan Zhang
Hi Ismael,

I think the ReadAll in the IO connector refers to the IO with SDF
implementation despite the type of input, where Read refers to
UnboundedSource.  One major pushback of using KafkaIO.Read as source
description is that not all configurations of KafkaIO.Read are meaningful
to populate during execution time. Also when thinking about x-lang useage,
making source description across language boundaries is also necessary.  As
Luke mentioned, it's quite easy to infer a Schema from an AutoValue object:
KafkaSourceDescription.java
.
Then the coder of this schema-aware object will be a SchemaCoder
.
When crossing language boundaries, it's also easy to convert a Row into the
source description: Convert.fromRows

.


On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik  wrote:

> To provide additional context, the KafkaIO ReadAll transform takes a
> PCollection. This KafkaSourceDescriptor is a POJO
> that contains the configurable parameters for reading from Kafka. This is
> different from the pattern that Ismael listed because they take
> PCollection as input and the Read is the same as the Read PTransform
> class used for the non read all case.
>
> The KafkaSourceDescriptor does lead to duplication since parameters used
> to configure the transform have to be copied over to the source descriptor
> but decouples how a transform is specified from the object that describes
> what needs to be done. I believe Ismael's point is that we wouldn't need
> such a decoupling.
>
> Another area that hasn't been discussed and I believe is a non-issue is
> that the Beam Java SDK has the most IO connectors and we would want to use
> the IO implementations within Beam Go and Beam Python. This brings in its
> own set of issues related to versioning and compatibility for the wire
> format and how one parameterizes such transforms. The wire format issue can
> be solved with either approach by making sure that the cross language
> expansion always takes the well known format (whatever it may be) and
> converts it into Read/KafkaSourceDescriptor/... object that is then passed
> to the ReadAll transform. Boyuan has been looking to make the
> KafkaSourceDescriptor have a schema so it can be represented as a row and
> this can be done easily using the AutoValue integration (I don't believe
> there is anything preventing someone from writing a schema row -> Read ->
> row adapter or also using the AutoValue configuration if the transform is
> also an AutoValue).
>
> I would be more for the code duplication and separation of concerns
> provided by using a different object to represent the contents of the
> PCollection from the pipeline construction time PTransform.
>
> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov 
> wrote:
>
>> Hi Ismael,
>>
>> Thanks for taking this on. Have you considered an approach similar (or
>> dual) to FileIO.write(), where we in a sense also have to configure a
>> dynamic number different IO transforms of the same type (file writes)?
>>
>> E.g. how in this example we configure many aspects of many file writes:
>>
>> transactions.apply(FileIO.writeDynamic()
>>  .by(Transaction::getType)
>>  .via(tx -> tx.getType().toFields(tx),  // Convert the data to be
>> written to CSVSink
>>   type -> new CSVSink(type.getFieldNames()))
>>  .to(".../path/to/")
>>  .withNaming(type -> defaultNaming(type + "-transactions", ".csv"));
>>
>> we could do something similar for many JdbcIO reads:
>>
>> PCollection bars;  // user-specific type from which all the read
>> parameters can be inferred
>> PCollection moos = bars.apply(JdbcIO.readAll()
>>   .fromQuery(bar -> ...compute query for this bar...)
>>   .withMapper((bar, resultSet) -> new Moo(...))
>>   .withBatchSize(bar -> ...compute batch size for this bar...)
>>   ...etc);
>>
>>
>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía  wrote:
>>
>>> Hello,
>>>
>>> (my excuses for the long email but this requires context)
>>>
>>> As part of the move from Source based IOs to DoFn based ones. One pattern
>>> emerged due to the composable nature of DoFn. The idea is to have a
>>> different
>>> kind of composable reads where we take a PCollection of different sorts
>>> of
>>> intermediate specifications e.g. tables, queries, etc, for example:
>>>
>>> JdbcIO:
>>> ReadAll extends
>>> PTransform, PCollection>
>>>
>>> RedisIO:
>>> ReadAll extends PTransform, PCollection>> String>>>
>>>
>>> HBaseIO:
>>> ReadAll extends PTransform, PCollection>
>>>
>>> These patterns enabled richer use cases like doing multiple queries in
>>> the same
>>> Pipeline, querying based on key patterns or 

Re: [DISCUSS] ReadAll pattern and consistent use in IO connectors

2020-06-24 Thread Luke Cwik
To provide additional context, the KafkaIO ReadAll transform takes a
PCollection. This KafkaSourceDescriptor is a POJO
that contains the configurable parameters for reading from Kafka. This is
different from the pattern that Ismael listed because they take
PCollection as input and the Read is the same as the Read PTransform
class used for the non read all case.

The KafkaSourceDescriptor does lead to duplication since parameters used to
configure the transform have to be copied over to the source descriptor but
decouples how a transform is specified from the object that describes what
needs to be done. I believe Ismael's point is that we wouldn't need such a
decoupling.

Another area that hasn't been discussed and I believe is a non-issue is
that the Beam Java SDK has the most IO connectors and we would want to use
the IO implementations within Beam Go and Beam Python. This brings in its
own set of issues related to versioning and compatibility for the wire
format and how one parameterizes such transforms. The wire format issue can
be solved with either approach by making sure that the cross language
expansion always takes the well known format (whatever it may be) and
converts it into Read/KafkaSourceDescriptor/... object that is then passed
to the ReadAll transform. Boyuan has been looking to make the
KafkaSourceDescriptor have a schema so it can be represented as a row and
this can be done easily using the AutoValue integration (I don't believe
there is anything preventing someone from writing a schema row -> Read ->
row adapter or also using the AutoValue configuration if the transform is
also an AutoValue).

I would be more for the code duplication and separation of concerns
provided by using a different object to represent the contents of the
PCollection from the pipeline construction time PTransform.

On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov 
wrote:

> Hi Ismael,
>
> Thanks for taking this on. Have you considered an approach similar (or
> dual) to FileIO.write(), where we in a sense also have to configure a
> dynamic number different IO transforms of the same type (file writes)?
>
> E.g. how in this example we configure many aspects of many file writes:
>
> transactions.apply(FileIO.writeDynamic()
>  .by(Transaction::getType)
>  .via(tx -> tx.getType().toFields(tx),  // Convert the data to be
> written to CSVSink
>   type -> new CSVSink(type.getFieldNames()))
>  .to(".../path/to/")
>  .withNaming(type -> defaultNaming(type + "-transactions", ".csv"));
>
> we could do something similar for many JdbcIO reads:
>
> PCollection bars;  // user-specific type from which all the read
> parameters can be inferred
> PCollection moos = bars.apply(JdbcIO.readAll()
>   .fromQuery(bar -> ...compute query for this bar...)
>   .withMapper((bar, resultSet) -> new Moo(...))
>   .withBatchSize(bar -> ...compute batch size for this bar...)
>   ...etc);
>
>
> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía  wrote:
>
>> Hello,
>>
>> (my excuses for the long email but this requires context)
>>
>> As part of the move from Source based IOs to DoFn based ones. One pattern
>> emerged due to the composable nature of DoFn. The idea is to have a
>> different
>> kind of composable reads where we take a PCollection of different sorts of
>> intermediate specifications e.g. tables, queries, etc, for example:
>>
>> JdbcIO:
>> ReadAll extends
>> PTransform, PCollection>
>>
>> RedisIO:
>> ReadAll extends PTransform, PCollection> String>>>
>>
>> HBaseIO:
>> ReadAll extends PTransform, PCollection>
>>
>> These patterns enabled richer use cases like doing multiple queries in
>> the same
>> Pipeline, querying based on key patterns or querying from multiple tables
>> at the
>> same time but came with some maintenance issues:
>>
>> - We ended up needing to add to the ReadAll transforms the parameters for
>>   missing information so we ended up with lots of duplicated with methods
>> and
>>   error-prone code from the Read transforms into the ReadAll transforms.
>>
>> - When you require new parameters you have to expand the input parameters
>> of the
>>   intermediary specification into something that resembles the full `Read`
>>   definition for example imagine you want to read from multiple tables or
>>   servers as part of the same pipeline but this was not in the
>> intermediate
>>   specification you end up adding those extra methods (duplicating more
>> code)
>>   just o get close to the be like the Read full spec.
>>
>> - If new parameters are added to the Read method we end up adding them
>>   systematically to the ReadAll transform too so they are taken into
>> account.
>>
>> Due to these issues I recently did a change to test a new approach that is
>> simpler, more complete and maintainable. The code became:
>>
>> HBaseIO:
>> ReadAll extends PTransform, PCollection>
>>
>> With this approach users gain benefits of improvements on parameters of
>> normal
>> Read because they count with the full Read paramet

Re: [DISCUSS] ReadAll pattern and consistent use in IO connectors

2020-06-24 Thread Eugene Kirpichov
Hi Ismael,

Thanks for taking this on. Have you considered an approach similar (or
dual) to FileIO.write(), where we in a sense also have to configure a
dynamic number different IO transforms of the same type (file writes)?

E.g. how in this example we configure many aspects of many file writes:

transactions.apply(FileIO.writeDynamic()
 .by(Transaction::getType)
 .via(tx -> tx.getType().toFields(tx),  // Convert the data to be
written to CSVSink
  type -> new CSVSink(type.getFieldNames()))
 .to(".../path/to/")
 .withNaming(type -> defaultNaming(type + "-transactions", ".csv"));

we could do something similar for many JdbcIO reads:

PCollection bars;  // user-specific type from which all the read
parameters can be inferred
PCollection moos = bars.apply(JdbcIO.readAll()
  .fromQuery(bar -> ...compute query for this bar...)
  .withMapper((bar, resultSet) -> new Moo(...))
  .withBatchSize(bar -> ...compute batch size for this bar...)
  ...etc);


On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía  wrote:

> Hello,
>
> (my excuses for the long email but this requires context)
>
> As part of the move from Source based IOs to DoFn based ones. One pattern
> emerged due to the composable nature of DoFn. The idea is to have a
> different
> kind of composable reads where we take a PCollection of different sorts of
> intermediate specifications e.g. tables, queries, etc, for example:
>
> JdbcIO:
> ReadAll extends
> PTransform, PCollection>
>
> RedisIO:
> ReadAll extends PTransform, PCollection String>>>
>
> HBaseIO:
> ReadAll extends PTransform, PCollection>
>
> These patterns enabled richer use cases like doing multiple queries in the
> same
> Pipeline, querying based on key patterns or querying from multiple tables
> at the
> same time but came with some maintenance issues:
>
> - We ended up needing to add to the ReadAll transforms the parameters for
>   missing information so we ended up with lots of duplicated with methods
> and
>   error-prone code from the Read transforms into the ReadAll transforms.
>
> - When you require new parameters you have to expand the input parameters
> of the
>   intermediary specification into something that resembles the full `Read`
>   definition for example imagine you want to read from multiple tables or
>   servers as part of the same pipeline but this was not in the intermediate
>   specification you end up adding those extra methods (duplicating more
> code)
>   just o get close to the be like the Read full spec.
>
> - If new parameters are added to the Read method we end up adding them
>   systematically to the ReadAll transform too so they are taken into
> account.
>
> Due to these issues I recently did a change to test a new approach that is
> simpler, more complete and maintainable. The code became:
>
> HBaseIO:
> ReadAll extends PTransform, PCollection>
>
> With this approach users gain benefits of improvements on parameters of
> normal
> Read because they count with the full Read parameters. But of course there
> are
> some minor caveats:
>
> 1. You need to push some information into normal Reads for example
>partition boundaries information or Restriction information (in the SDF
>case).  Notice that this consistent approach of ReadAll produces a
> simple
>pattern that ends up being almost reusable between IOs (e.g. the
> non-SDF
>case):
>
>   public static class ReadAll extends PTransform,
> PCollection> {
> @Override
> public PCollection expand(PCollection input) {
>   return input
>   .apply("Split", ParDo.of(new SplitFn()))
>   .apply("Reshuffle", Reshuffle.viaRandomKey())
>   .apply("Read", ParDo.of(new ReadFn()));
> }
>   }
>
> 2. If you are using Generic types for the results ReadAll you must have the
>Coders used in its definition and require consistent types from the data
>sources, in practice this means we need to add extra withCoder
> method(s) on
>ReadAll but not the full specs.
>
>
> At the moment HBaseIO and SolrIO already follow this ReadAll pattern.
> RedisIO
> and CassandraIO have already WIP PRs to do so. So I wanted to bring this
> subject
> to the mailing list to see your opinions, and if you see any sort of
> issues that
> we might be missing with this idea.
>
> Also I would like to see if we have consensus to start using consistently
> the
> terminology of ReadAll transforms based on Read and the readAll() method
> for new
> IOs (at this point probably outdoing this in the only remaining
> inconsistent
> place in JdbcIO might not be a good idea but apart of this we should be
> ok).
>
> I mention this because the recent PR on KafkaIO based on SDF is doing
> something
> similar to the old pattern but being called ReadAll and maybe it is worth
> to be
> consistent for the benefit of users.
>
> Regards,
> Ismaël
>


Re: Running Beam pipeline using Spark on YARN

2020-06-24 Thread Kamil Wasilewski
Thanks for the information. So it looks like we can't easily run portable
pipelines on Dataproc cluster at the moment.

> you can set --output_executable_path to create a jar that you can then
submit to yarn via spark-submit.

I tried to create a jar, but I ran into a problem. I left an error message
in a comment for https://issues.apache.org/jira/browse/BEAM-8970.


On Wed, Jun 24, 2020 at 1:25 AM Kyle Weaver  wrote:

> > So hopefully setting --spark-master-url to be yarn will work too.
>
> This is not supported.
>
> On Tue, Jun 23, 2020 at 2:58 PM Xinyu Liu  wrote:
>
>> I am doing some prototyping on this too. I used spark-submit script
>> instead of the rest api. In my simple setup, I ran
>> SparkJobServerDriver.main() directly in the AM as a spark job, which
>> will submit the python job to the default spark master url pointing to
>> "local". I also use --files in the spark-submit script to upload the python
>> packages and boot script. On the python side, I was using the following
>> pipeline options for submission (thanks to Thomas):
>>
>> pipeline_options = PipelineOptions([
>>
>> "--runner=PortableRunner",
>>
>> "--job_endpoint=your-job-server:8099",
>>
>> "--environment_type=PROCESS",
>> "--environment_config={\"command\": \"./boot\"}")]
>>
>> I used my own boot script for customized python packaging. WIth this
>> setup I was able to get a simple hello-world program running. I haven't
>> tried to run the job server separately from the AM yet. So hopefully
>> setting --spark-master-url to be yarn will work too.
>>
>> Thanks,
>> Xinyu
>>
>> On Tue, Jun 23, 2020 at 12:18 PM Kyle Weaver  wrote:
>>
>>> Hi Kamil, there is a JIRA for this:
>>> https://issues.apache.org/jira/browse/BEAM-8970 It's theoretically
>>> possible but remains untested as far as I know :)
>>>
>>> As I indicated in a comment, you can set --output_executable_path to
>>> create a jar that you can then submit to yarn via spark-submit.
>>>
>>> If you can get this working, I'd additionally like to script the jar
>>> submission in python to save users the extra step.
>>>
>>> Thanks,
>>> Kyle
>>>
>>> On Tue, Jun 23, 2020 at 9:16 AM Kamil Wasilewski <
>>> kamil.wasilew...@polidea.com> wrote:
>>>
 Hi all,

 I'm trying to run a Beam pipeline using Spark on YARN. My pipeline is
 written in Python, so I need to use a portable runner. Does anybody know
 how I should configure job server parameters, especially
 --spark-master-url?  Is there anything else I need to be aware of while
 using such setup?

 If it makes a difference, I use Google Dataproc.

 Best,
 Kamil

>>>


Re: JIRA contributor permissions

2020-06-24 Thread Alexey Romanenko
Hi Brian,

Done. Welcome to the project!

> On 24 Jun 2020, at 01:52, Brian Michalski  wrote:
> 
> Greetings!
> 
> I'm wading my way a few small Go SDK tickets. Can I have contributor 
> permissions on JIRA?  My username is bamnet.
> 
> Thanks,
> ~Brian M



[DISCUSS] ReadAll pattern and consistent use in IO connectors

2020-06-24 Thread Ismaël Mejía
Hello,

(my excuses for the long email but this requires context)

As part of the move from Source based IOs to DoFn based ones. One pattern
emerged due to the composable nature of DoFn. The idea is to have a different
kind of composable reads where we take a PCollection of different sorts of
intermediate specifications e.g. tables, queries, etc, for example:

JdbcIO:
ReadAll extends
PTransform, PCollection>

RedisIO:
ReadAll extends PTransform, PCollection>>

HBaseIO:
ReadAll extends PTransform, PCollection>

These patterns enabled richer use cases like doing multiple queries in the same
Pipeline, querying based on key patterns or querying from multiple tables at the
same time but came with some maintenance issues:

- We ended up needing to add to the ReadAll transforms the parameters for
  missing information so we ended up with lots of duplicated with methods and
  error-prone code from the Read transforms into the ReadAll transforms.

- When you require new parameters you have to expand the input parameters of the
  intermediary specification into something that resembles the full `Read`
  definition for example imagine you want to read from multiple tables or
  servers as part of the same pipeline but this was not in the intermediate
  specification you end up adding those extra methods (duplicating more code)
  just o get close to the be like the Read full spec.

- If new parameters are added to the Read method we end up adding them
  systematically to the ReadAll transform too so they are taken into account.

Due to these issues I recently did a change to test a new approach that is
simpler, more complete and maintainable. The code became:

HBaseIO:
ReadAll extends PTransform, PCollection>

With this approach users gain benefits of improvements on parameters of normal
Read because they count with the full Read parameters. But of course there are
some minor caveats:

1. You need to push some information into normal Reads for example
   partition boundaries information or Restriction information (in the SDF
   case).  Notice that this consistent approach of ReadAll produces a simple
   pattern that ends up being almost reusable between IOs (e.g. thenon-SDF
   case):

  public static class ReadAll extends PTransform,
PCollection> {
@Override
public PCollection expand(PCollection input) {
  return input
  .apply("Split", ParDo.of(new SplitFn()))
  .apply("Reshuffle", Reshuffle.viaRandomKey())
  .apply("Read", ParDo.of(new ReadFn()));
}
  }

2. If you are using Generic types for the results ReadAll you must have the
   Coders used in its definition and require consistent types from the data
   sources, in practice this means we need to add extra withCoder method(s) on
   ReadAll but not the full specs.


At the moment HBaseIO and SolrIO already follow this ReadAll pattern. RedisIO
and CassandraIO have already WIP PRs to do so. So I wanted to bring this subject
to the mailing list to see your opinions, and if you see any sort of issues that
we might be missing with this idea.

Also I would like to see if we have consensus to start using consistently the
terminology of ReadAll transforms based on Read and the readAll() method for new
IOs (at this point probably outdoing this in the only remaining inconsistent
place in JdbcIO might not be a good idea but apart of this we should be ok).

I mention this because the recent PR on KafkaIO based on SDF is doing something
similar to the old pattern but being called ReadAll and maybe it is worth to be
consistent for the benefit of users.

Regards,
Ismaël