Re: Questions about the bundled PubsubIO read implementation

2019-07-10 Thread Steve Niemitz
Oh, one other important thing I forgot to mention is that I can't reproduce
(the empty message issue at least) locally on the DirectRunner.

On Wed, Jul 10, 2019 at 6:04 PM Steve Niemitz  wrote:

> Thanks for making JIRAs for these, I was going to, I just wanted to do a
> sanity check first. :)
>
> I reproduced them all with the stock PubsubIO first, and then again with
> the gRPC client.  I can try to throw together a much more minimal repro
> case too.
>
> On Wed, Jul 10, 2019 at 4:21 PM Kenneth Knowles  wrote:
>
>> This is pretty surprising. Seems valuable to file separate Jiras so we
>> can track investigation and resolution.
>>
>>  - use gRPC: https://issues.apache.org/jira/browse/BEAM-7718
>>  - empty message bodies: https://issues.apache.org/jira/browse/BEAM-7716
>>  - watermark tracking: https://issues.apache.org/jira/browse/BEAM-7717
>>
>> You reproduced these with the original PubsubIO?
>>
>> Kenn
>>
>> On Mon, Jul 8, 2019 at 10:38 AM Steve Niemitz 
>> wrote:
>>
>>> I was trying to use the bundled PubsubIO.Read implementation in beam on
>>> dataflow (using --experiments=enable_custom_pubsub_source to prevent
>>> dataflow from overriding it with its own implementation) and ran into some
>>> interesting issues.  I was curious if people have any experience with
>>> these.  I'd assume anyone using PubsubIO on a runner other than dataflow
>>> would have run into the same things.
>>>
>>> - The default implementation uses the HTTP REST API, which seems to be
>>> much less performant than the gRPC implementation.  Is there a reason that
>>> the gRPC implementation is essentially unavailable from the public API?
>>> PubsubIO.Read.withClientFactory is package private.  I worked around this
>>> by making it public and rebuilding, which led me to...
>>>
>>> - Both the JSON and gRPC implementation return empty message bodies for
>>> all messages read (using readMessages).  When running with the
>>> dataflow-specific reader, this doesn't happen and the message bodies have
>>> the content as expected.  I took a pipeline that works as expected on
>>> dataflow using PubsubIO.Read, added the experiment flag, and then my
>>> pipeline broke from empty message bodies.  This obviously blocked me from
>>> really experimenting much more.
>>>
>>> - The watermark tracking seems off.  The dataflow UI was reporting my
>>> watermark as around (but not exactly) the epoch (it was ~1970-01-19), which
>>> makes me wonder if seconds/milliseconds got confused somewhere (ie, if you
>>> take the time since epoch in milliseconds now and interpret it as seconds,
>>> you'll get somewhere around 1970-01-18).
>>>
>>


Re: Questions about the bundled PubsubIO read implementation

2019-07-10 Thread Steve Niemitz
Thanks for making JIRAs for these, I was going to, I just wanted to do a
sanity check first. :)

I reproduced them all with the stock PubsubIO first, and then again with
the gRPC client.  I can try to throw together a much more minimal repro
case too.

On Wed, Jul 10, 2019 at 4:21 PM Kenneth Knowles  wrote:

> This is pretty surprising. Seems valuable to file separate Jiras so we can
> track investigation and resolution.
>
>  - use gRPC: https://issues.apache.org/jira/browse/BEAM-7718
>  - empty message bodies: https://issues.apache.org/jira/browse/BEAM-7716
>  - watermark tracking: https://issues.apache.org/jira/browse/BEAM-7717
>
> You reproduced these with the original PubsubIO?
>
> Kenn
>
> On Mon, Jul 8, 2019 at 10:38 AM Steve Niemitz  wrote:
>
>> I was trying to use the bundled PubsubIO.Read implementation in beam on
>> dataflow (using --experiments=enable_custom_pubsub_source to prevent
>> dataflow from overriding it with its own implementation) and ran into some
>> interesting issues.  I was curious if people have any experience with
>> these.  I'd assume anyone using PubsubIO on a runner other than dataflow
>> would have run into the same things.
>>
>> - The default implementation uses the HTTP REST API, which seems to be
>> much less performant than the gRPC implementation.  Is there a reason that
>> the gRPC implementation is essentially unavailable from the public API?
>> PubsubIO.Read.withClientFactory is package private.  I worked around this
>> by making it public and rebuilding, which led me to...
>>
>> - Both the JSON and gRPC implementation return empty message bodies for
>> all messages read (using readMessages).  When running with the
>> dataflow-specific reader, this doesn't happen and the message bodies have
>> the content as expected.  I took a pipeline that works as expected on
>> dataflow using PubsubIO.Read, added the experiment flag, and then my
>> pipeline broke from empty message bodies.  This obviously blocked me from
>> really experimenting much more.
>>
>> - The watermark tracking seems off.  The dataflow UI was reporting my
>> watermark as around (but not exactly) the epoch (it was ~1970-01-19), which
>> makes me wonder if seconds/milliseconds got confused somewhere (ie, if you
>> take the time since epoch in milliseconds now and interpret it as seconds,
>> you'll get somewhere around 1970-01-18).
>>
>


Re: Questions about the bundled PubsubIO read implementation

2019-07-10 Thread Kenneth Knowles
This is pretty surprising. Seems valuable to file separate Jiras so we can
track investigation and resolution.

 - use gRPC: https://issues.apache.org/jira/browse/BEAM-7718
 - empty message bodies: https://issues.apache.org/jira/browse/BEAM-7716
 - watermark tracking: https://issues.apache.org/jira/browse/BEAM-7717

You reproduced these with the original PubsubIO?

Kenn

On Mon, Jul 8, 2019 at 10:38 AM Steve Niemitz  wrote:

> I was trying to use the bundled PubsubIO.Read implementation in beam on
> dataflow (using --experiments=enable_custom_pubsub_source to prevent
> dataflow from overriding it with its own implementation) and ran into some
> interesting issues.  I was curious if people have any experience with
> these.  I'd assume anyone using PubsubIO on a runner other than dataflow
> would have run into the same things.
>
> - The default implementation uses the HTTP REST API, which seems to be
> much less performant than the gRPC implementation.  Is there a reason that
> the gRPC implementation is essentially unavailable from the public API?
> PubsubIO.Read.withClientFactory is package private.  I worked around this
> by making it public and rebuilding, which led me to...
>
> - Both the JSON and gRPC implementation return empty message bodies for
> all messages read (using readMessages).  When running with the
> dataflow-specific reader, this doesn't happen and the message bodies have
> the content as expected.  I took a pipeline that works as expected on
> dataflow using PubsubIO.Read, added the experiment flag, and then my
> pipeline broke from empty message bodies.  This obviously blocked me from
> really experimenting much more.
>
> - The watermark tracking seems off.  The dataflow UI was reporting my
> watermark as around (but not exactly) the epoch (it was ~1970-01-19), which
> makes me wonder if seconds/milliseconds got confused somewhere (ie, if you
> take the time since epoch in milliseconds now and interpret it as seconds,
> you'll get somewhere around 1970-01-18).
>


Re: [Opinion] [Question] Python SDK & Java SDK

2019-07-10 Thread Lukasz Cwik
Age is the largest consideration since the Python SDK was started a few
years after the Java one was started. Another consideration was that the
Python SDK only worked on Dataflow and until recently due to the work with
portability, a few other runners have been able to execute Python
pipelines. And now that there are several runners, the excitement and
development pace around Python has sped up significantly.

Improving the documentation to show examples across multiple languages is a
simple way new contributors can really help the project.

On Wed, Jul 10, 2019 at 6:55 AM Shannon Duncan 
wrote:

> So I know going into this question that there will be varying opinions.
> However I've noticed some things since starting with beam full time a few
> weeks ago.
>
> 1. Python is second party SDK to Beam and doesn't seem to be to feature
> parity with Java.
> 2. Even on supporting modules like fastavro Python still doesn't match up
> with Java features.
> 3. Almost all tutorials and documentation around Beam and Big Data are
> done in Java making it harder to learn the Python side of things.
>
> So with these observations I'm curious. Is it just the age of the Python
> SDK as the reason behind the lack of feature parity?
>
> I'm also curious, are there any noticeable performance differences with
> using Python SDK vs Java SDK in dataflow?
>
> Thanks,
> Shannon
>


Re: [Python] Read Hadoop Sequence File?

2019-07-10 Thread Shannon Duncan
If I wanted to go ahead and include this within a new Java Pipeline, what
would I be looking at for level of work to integrate?

On Wed, Jul 3, 2019 at 3:54 AM Ismaël Mejía  wrote:

> That's great. I can help whenever you need. We just need to choose its
> destination. Both the `hadoop-format` and `hadoop-file-system` modules
> are good candidates, I would even feel inclined to put it in its own
> module `sdks/java/extensions/sequencefile` to make it more easy to
> discover by the final users.
>
> A thing to consider is the SeekableByteChannel adapters, we can move
> that into hadoop-common if needed and refactor the modules to share
> code. Worth to take a look at
>
> org.apache.beam.sdk.io.hdfs.HadoopFileSystem.HadoopSeekableByteChannel#HadoopSeekableByteChannel
> to see if some of it could be useful.
>
> On Tue, Jul 2, 2019 at 11:46 PM Igor Bernstein 
> wrote:
> >
> > Hi all,
> >
> > I wrote those classes with the intention of upstreaming them to Beam. I
> can try to make some time this quarter to clean them up. I would need a bit
> of guidance from a beam expert in how to make them coexist with
> HadoopFormatIO though.
> >
> >
> > On Tue, Jul 2, 2019 at 10:55 AM Solomon Duskis 
> wrote:
> >>
> >> +Igor Bernstein who wrote the Cloud Bigtable Sequence File classes.
> >>
> >> Solomon Duskis | Google Cloud clients | sdus...@google.com |
> 914-462-0531
> >>
> >>
> >> On Tue, Jul 2, 2019 at 4:57 AM Ismaël Mejía  wrote:
> >>>
> >>> (Adding dev@ and Solomon Duskis to the discussion)
> >>>
> >>> I was not aware of these thanks for sharing David. Definitely it would
> >>> be a great addition if we could have those donated as an extension in
> >>> the Beam side. We can even evolve them in the future to be more FileIO
> >>> like. Any chance this can happen? Maybe Solomon and his team?
> >>>
> >>>
> >>>
> >>> On Tue, Jul 2, 2019 at 9:39 AM David Morávek  wrote:
> >>> >
> >>> > Hi, you can use SequenceFileSink and Source, from a BigTable client.
> Those works nice with FileIO.
> >>> >
> >>> >
> https://github.com/googleapis/cloud-bigtable-client/blob/master/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/SequenceFileSink.java
> >>> >
> https://github.com/googleapis/cloud-bigtable-client/blob/master/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/SequenceFileSource.java
> >>> >
> >>> > It would be really cool to move these into Beam, but that's up to
> Googlers to decide, whether they want to donate this.
> >>> >
> >>> > D.
> >>> >
> >>> > On Tue, Jul 2, 2019 at 2:07 AM Shannon Duncan <
> joseph.dun...@liveramp.com> wrote:
> >>> >>
> >>> >> It's not outside the realm of possibilities. For now I've created
> an intermediary step of a hadoop job that converts from sequence to text
> file.
> >>> >>
> >>> >> Looking into better options.
> >>> >>
> >>> >> On Mon, Jul 1, 2019, 5:50 PM Chamikara Jayalath <
> chamik...@google.com> wrote:
> >>> >>>
> >>> >>> Java SDK has a HadoopInputFormatIO using which you should be able
> to read Sequence files:
> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
> >>> >>> I don't think there's a direct alternative for this for Python.
> >>> >>>
> >>> >>> Is it possible to write to a well-known format such as Avro
> instead of a Hadoop specific format which will allow you to read from both
> Dataproc/Hadoop and Beam Python SDK ?
> >>> >>>
> >>> >>> Thanks,
> >>> >>> Cham
> >>> >>>
> >>> >>> On Mon, Jul 1, 2019 at 3:37 PM Shannon Duncan <
> joseph.dun...@liveramp.com> wrote:
> >>> 
> >>>  That's a pretty big hole for a missing source/sink when looking
> at transitioning from Dataproc to Dataflow using GCS as storage buffer
> instead of a traditional hdfs.
> >>> 
> >>>  From what I've been able to tell from source code and
> documentation, Java is able to but not Python?
> >>> 
> >>>  Thanks,
> >>>  Shannon
> >>> 
> >>>  On Mon, Jul 1, 2019 at 5:29 PM Chamikara Jayalath <
> chamik...@google.com> wrote:
> >>> >
> >>> > I don't think we have a source/sink for reading Hadoop sequence
> files. Your best bet currently will probably be to use FileSystem
> abstraction to create a file from a ParDo and read directly from there
> using a library that can read sequence files.
> >>> >
> >>> > Thanks,
> >>> > Cham
> >>> >
> >>> > On Mon, Jul 1, 2019 at 8:42 AM Shannon Duncan <
> joseph.dun...@liveramp.com> wrote:
> >>> >>
> >>> >> I'm wanting to read a Sequence/Map file from Hadoop stored on
> Google Cloud Storage via a " gs://bucket/link/SequenceFile-* " via the
> Python SDK.
> >>> >>
> >>> >> I cannot locate any good adapters for this, and the one Hadoop
> Filesystem reader seems to only read from a "hdfs://" url.
> >>> >>
> >>> >> I'm wanting to use Dataflow and GCS exclusively to start mixing
> in Beam pipelines

Re: [Python SDK] Avro read/write & Indexing

2019-07-10 Thread Shannon Duncan
I'm not finding the reference to `DataFileReader` in the Python SDK. Do you
have a link to the source code or api documentation for that?

On Wed, Jul 10, 2019 at 5:46 AM Reza Rokni  wrote:

> Hi,
>
> I have not tried this ( and don't have a chance to test it at the moment)
> so apologies if its incorrect, but could you use something like
> the DataFileReader within a DoFn to get access to your key? It looks like
> it has seek / sync methods that might work for this. Assuming of course
> that the data for the key is small enough to not need to be parallelized on
> the read.
>
> Cheers
> Reza
>
>
>
> On Tue, 9 Jul 2019 at 23:52, Lukasz Cwik  wrote:
>
>> Typically this would be done by reading in the contents of the entire
>> file into a map side input and then consuming that side input within a DoFn.
>>
>> Unfortunately, only Dataflow supports really large side inputs with an
>> efficient access pattern and only when using Beam Java for bounded
>> pipelines. Support for really large side inputs for Beam Python bounded
>> pipelines on Dataflow is coming but not yet available.
>>
>> Otherwise, you could still read the Avro files and still create a map and
>> store the index as a side input and as long as the index fits in memory,
>> this would work well across all runners.
>>
>> The programming guide[1] has a basic example on how to get started using
>> side inputs.
>>
>> 1: https://beam.apache.org/documentation/programming-guide/#side-inputs
>>
>>
>> On Tue, Jul 9, 2019 at 2:21 PM Shannon Duncan 
>> wrote:
>>
>>> So being pretty new to beam and big data I have been working on
>>> standardizing some input output items for different
>>> hadoop/beam/spark/bigquery jobs and processes.
>>>
>>> So what I'm working on is having them all read/write Avro files which is
>>> actually pretty straight forward. So basic read/write I have down.
>>>
>>> What I'm looking for and hoping someone on this list knows, is how to
>>> index an Avro file and be able to search quickly through that index to only
>>> open a partial part of an Avro file in beam.
>>>
>>> For example currently our pipeline is able to do this with Hadoop and
>>> Sequence Files since they store  with bytesoffest.
>>>
>>> So given a key I'd like to only pull that key from the Avro file
>>> reducing IO / Network costs.
>>>
>>> Any ideas, thoughts, suggestions?
>>>
>>> Thanks!
>>> Shannon
>>>
>>
>
> --
>
> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided
> solely as a basis for further discussion, and are not intended to be and do
> not constitute a legally binding obligation. No legally binding obligations
> will be created, implied, or inferred until an agreement in final form is
> executed in writing by all parties involved.
>


[Opinion] [Question] Python SDK & Java SDK

2019-07-10 Thread Shannon Duncan
So I know going into this question that there will be varying opinions.
However I've noticed some things since starting with beam full time a few
weeks ago.

1. Python is second party SDK to Beam and doesn't seem to be to feature
parity with Java.
2. Even on supporting modules like fastavro Python still doesn't match up
with Java features.
3. Almost all tutorials and documentation around Beam and Big Data are done
in Java making it harder to learn the Python side of things.

So with these observations I'm curious. Is it just the age of the Python
SDK as the reason behind the lack of feature parity?

I'm also curious, are there any noticeable performance differences with
using Python SDK vs Java SDK in dataflow?

Thanks,
Shannon


Re: [Python SDK] Avro read/write & Indexing

2019-07-10 Thread Reza Rokni
Hi,

I have not tried this ( and don't have a chance to test it at the moment)
so apologies if its incorrect, but could you use something like
the DataFileReader within a DoFn to get access to your key? It looks like
it has seek / sync methods that might work for this. Assuming of course
that the data for the key is small enough to not need to be parallelized on
the read.

Cheers
Reza



On Tue, 9 Jul 2019 at 23:52, Lukasz Cwik  wrote:

> Typically this would be done by reading in the contents of the entire file
> into a map side input and then consuming that side input within a DoFn.
>
> Unfortunately, only Dataflow supports really large side inputs with an
> efficient access pattern and only when using Beam Java for bounded
> pipelines. Support for really large side inputs for Beam Python bounded
> pipelines on Dataflow is coming but not yet available.
>
> Otherwise, you could still read the Avro files and still create a map and
> store the index as a side input and as long as the index fits in memory,
> this would work well across all runners.
>
> The programming guide[1] has a basic example on how to get started using
> side inputs.
>
> 1: https://beam.apache.org/documentation/programming-guide/#side-inputs
>
>
> On Tue, Jul 9, 2019 at 2:21 PM Shannon Duncan 
> wrote:
>
>> So being pretty new to beam and big data I have been working on
>> standardizing some input output items for different
>> hadoop/beam/spark/bigquery jobs and processes.
>>
>> So what I'm working on is having them all read/write Avro files which is
>> actually pretty straight forward. So basic read/write I have down.
>>
>> What I'm looking for and hoping someone on this list knows, is how to
>> index an Avro file and be able to search quickly through that index to only
>> open a partial part of an Avro file in beam.
>>
>> For example currently our pipeline is able to do this with Hadoop and
>> Sequence Files since they store  with bytesoffest.
>>
>> So given a key I'd like to only pull that key from the Avro file reducing
>> IO / Network costs.
>>
>> Any ideas, thoughts, suggestions?
>>
>> Thanks!
>> Shannon
>>
>

-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.