Re: Proposed API for a Whole File IO

2017-08-14 Thread Chris Hebert
I opened a pull request with my initial implementation of WholeFileIO:
https://github.com/apache/beam/pull/3717

It has a few TODO items left, but feel free to review it and comment on my
approach.

On Tue, Aug 8, 2017 at 4:37 PM, Chamikara Jayalath 
wrote:

> I also agree that it'll be  cleaner to write your elements to files using a
> ParDo instead of trying to use FileBasedSink. FileBasedSink is specifically
> designed for writing bundles of elements to single files of a given
> file-type. Same goes for FileBasedSource when reading. You can use the
> FileSystem abstraction to abstract out reading from/writing to different
> file-systems.
>
> BTW I appreciate if you can create Python SDK versions of these JIRAs as
> well and update those JIRAs once the Java SDK versions are finalized. Even
> better if you can implement Python SDK versions as well :).
>
> Thanks,
> Cham
>
> On Mon, Aug 7, 2017 at 3:14 PM Eugene Kirpichov
>  wrote:
>
> > Hi! Thanks for filing the tickets.
> >
> > On Mon, Aug 7, 2017 at 1:44 PM Chris Hebert <
> > chris.hebert-...@digitalreasoning.com> wrote:
> >
> > > Hello again,
> > >
> > > I created tickets to capture these requests:
> > > https://issues.apache.org/jira/browse/BEAM-2750
> > > https://issues.apache.org/jira/browse/BEAM-2751
> > >
> > > I've started working on the Write part.
> > >
> > > Robert, after some time working on this, I'm unable to see how these
> > > objectives can be "entirely done in the context of a DoFn". Could you
> > lend
> > > a hint?
> > >
> > > I assume you didn't mean for me to append to my pipeline a ParDo.of(new
> > > DoFn... that manually writes out to some file location, did you? That
> > would
> > > lose all the benefits of the IO/Sink classes.
> > >
> > Which benefits? I think nearly everything that FileBasedSink does is
> > unnecessary for WholeFileIO.write() - I agree with Robert that it can be
> > implemented with a ParDo (I suppose he meant that WholeFileIO.write() can
> > be implemented by expanding it into a single ParDo).
> >
> >
> > >
> > > That said, I've found the "sharding" logic to be deeply embedded in all
> > the
> > > FileBaseSink derivatives, and my attempt at sidestepping this logic
> isn't
> > > going very well. I managed to write a FileBasedSink that writes Byte[]
> > out
> > > (and that works correctly), but figuring out how to get the
> > FIlenamePolicy
> > > to be different for each element written out seems counter to the
> intent
> > of
> > > much of these classes.
> > >
> > > Chris
> > >
> > > On Wed, Aug 2, 2017 at 10:23 AM, Reuven Lax 
> > > wrote:
> > >
> > > > On Wed, Aug 2, 2017 at 7:49 AM, Chris Hebert <
> > > > chris.hebert-...@digitalreasoning.com> wrote:
> > > >
> > > > > Thanks for the feedback!
> > > > >
> > > > > Aggregated thoughts:
> > > > >
> > > > >1. Warn users about large files (like 5GB large)
> > > > >
> > > >
> > > > I would set the threshold smaller. Also remember, that while you may
> > > warn,
> > > > some runners might simply fail to process the record causing
> pipelines
> > to
> > > > either get stuck or fail all together.
> > > >
> > > >
> > > > >2. Filenames can stick with contents via
> PCollection > > > >contents>>
> > > > >3. InputStreams can't be encoded directly, but could be
> referenced
> > > in
> > > > a
> > > > >FileWrapper object
> > > > >4. Be mindful of sink race conditions with multiple workers;
> make
> > > sure
> > > > >failed workers cleanup incompletely written files
> > > > >5. File systems often do better with few large files than many
> > small
> > > > > ones
> > > > >6. Most/all of this can be done in the context of a DoFn
> > > > >
> > > > > # Regarding point 1 and point 2
> > > > > Yes!
> > > > >
> > > > > # Regarding point 3:
> > > > >
> > > > > ## Approach A:
> > > > > When the FileWrapper is encoded, it must somehow encode a reference
> > to
> > > > the
> > > > > InputStream it is associated with, so that when the FileWrapper is
> > > > decoded
> > > > > it can pick up that InputStream again. My Java knowledge isn't deep
> > > > enough
> > > > > to know how one would do that with hashcodes and object lookup and
> > > such,
> > > > > but I could sidestep that entirely by simply encoding the filepath
> > with
> > > > the
> > > > > FileWrapper, then open up a new InputStream on that file path every
> > > time
> > > > > the FileWrapper is decoded.
> > > > >
> > > > > ## Approach B:
> > > > > An alternative to the above technique is to simply pass a byte[]
> > array,
> > > > > like so:
> > > > > PCollection> fileNamesAndBytes = p.apply("Read",
> > > > > WholeFileIO.read().from("/path/to/input/dir/*"));
> > > > >
> > > > > That would solve the encoding problem, allow users to get whatever
> > they
> > > > > want out of it with a ByteArrayInputStream, AND put a hard limit on
> > the
> > > > > size of incoming files at just below 2 GB (if my math is right).
> > (This
> > > is
> > > > > large enough for my use case, at present.)
> > > > >
> 

Re: Proposed API for a Whole File IO

2017-08-08 Thread Chamikara Jayalath
I also agree that it'll be  cleaner to write your elements to files using a
ParDo instead of trying to use FileBasedSink. FileBasedSink is specifically
designed for writing bundles of elements to single files of a given
file-type. Same goes for FileBasedSource when reading. You can use the
FileSystem abstraction to abstract out reading from/writing to different
file-systems.

BTW I appreciate if you can create Python SDK versions of these JIRAs as
well and update those JIRAs once the Java SDK versions are finalized. Even
better if you can implement Python SDK versions as well :).

Thanks,
Cham

On Mon, Aug 7, 2017 at 3:14 PM Eugene Kirpichov
 wrote:

> Hi! Thanks for filing the tickets.
>
> On Mon, Aug 7, 2017 at 1:44 PM Chris Hebert <
> chris.hebert-...@digitalreasoning.com> wrote:
>
> > Hello again,
> >
> > I created tickets to capture these requests:
> > https://issues.apache.org/jira/browse/BEAM-2750
> > https://issues.apache.org/jira/browse/BEAM-2751
> >
> > I've started working on the Write part.
> >
> > Robert, after some time working on this, I'm unable to see how these
> > objectives can be "entirely done in the context of a DoFn". Could you
> lend
> > a hint?
> >
> > I assume you didn't mean for me to append to my pipeline a ParDo.of(new
> > DoFn... that manually writes out to some file location, did you? That
> would
> > lose all the benefits of the IO/Sink classes.
> >
> Which benefits? I think nearly everything that FileBasedSink does is
> unnecessary for WholeFileIO.write() - I agree with Robert that it can be
> implemented with a ParDo (I suppose he meant that WholeFileIO.write() can
> be implemented by expanding it into a single ParDo).
>
>
> >
> > That said, I've found the "sharding" logic to be deeply embedded in all
> the
> > FileBaseSink derivatives, and my attempt at sidestepping this logic isn't
> > going very well. I managed to write a FileBasedSink that writes Byte[]
> out
> > (and that works correctly), but figuring out how to get the
> FIlenamePolicy
> > to be different for each element written out seems counter to the intent
> of
> > much of these classes.
> >
> > Chris
> >
> > On Wed, Aug 2, 2017 at 10:23 AM, Reuven Lax 
> > wrote:
> >
> > > On Wed, Aug 2, 2017 at 7:49 AM, Chris Hebert <
> > > chris.hebert-...@digitalreasoning.com> wrote:
> > >
> > > > Thanks for the feedback!
> > > >
> > > > Aggregated thoughts:
> > > >
> > > >1. Warn users about large files (like 5GB large)
> > > >
> > >
> > > I would set the threshold smaller. Also remember, that while you may
> > warn,
> > > some runners might simply fail to process the record causing pipelines
> to
> > > either get stuck or fail all together.
> > >
> > >
> > > >2. Filenames can stick with contents via PCollection > > >contents>>
> > > >3. InputStreams can't be encoded directly, but could be referenced
> > in
> > > a
> > > >FileWrapper object
> > > >4. Be mindful of sink race conditions with multiple workers; make
> > sure
> > > >failed workers cleanup incompletely written files
> > > >5. File systems often do better with few large files than many
> small
> > > > ones
> > > >6. Most/all of this can be done in the context of a DoFn
> > > >
> > > > # Regarding point 1 and point 2
> > > > Yes!
> > > >
> > > > # Regarding point 3:
> > > >
> > > > ## Approach A:
> > > > When the FileWrapper is encoded, it must somehow encode a reference
> to
> > > the
> > > > InputStream it is associated with, so that when the FileWrapper is
> > > decoded
> > > > it can pick up that InputStream again. My Java knowledge isn't deep
> > > enough
> > > > to know how one would do that with hashcodes and object lookup and
> > such,
> > > > but I could sidestep that entirely by simply encoding the filepath
> with
> > > the
> > > > FileWrapper, then open up a new InputStream on that file path every
> > time
> > > > the FileWrapper is decoded.
> > > >
> > > > ## Approach B:
> > > > An alternative to the above technique is to simply pass a byte[]
> array,
> > > > like so:
> > > > PCollection> fileNamesAndBytes = p.apply("Read",
> > > > WholeFileIO.read().from("/path/to/input/dir/*"));
> > > >
> > > > That would solve the encoding problem, allow users to get whatever
> they
> > > > want out of it with a ByteArrayInputStream, AND put a hard limit on
> the
> > > > size of incoming files at just below 2 GB (if my math is right).
> (This
> > is
> > > > large enough for my use case, at present.)
> > > >
> > > >
> > > > # Regarding point 4:
> > > >
> > > > Any examples or guidance I could pull from to protect against this
> > > > properly?
> > > >
> > > >
> > > > # Regarding point 5:
> > > >
> > > > TextIO can read and write with different compressions. Would it be
> > > feasible
> > > > for this WholeFileIO to read and write these many files to compressed
> > zip
> > > > files also? (I envision this as a stretch feature that would be added
> > > after
> > > > the initial iteration anyway.)
> > > >
> > > >
> > > > # Regarding p

Re: Proposed API for a Whole File IO

2017-08-07 Thread Eugene Kirpichov
Hi! Thanks for filing the tickets.

On Mon, Aug 7, 2017 at 1:44 PM Chris Hebert <
chris.hebert-...@digitalreasoning.com> wrote:

> Hello again,
>
> I created tickets to capture these requests:
> https://issues.apache.org/jira/browse/BEAM-2750
> https://issues.apache.org/jira/browse/BEAM-2751
>
> I've started working on the Write part.
>
> Robert, after some time working on this, I'm unable to see how these
> objectives can be "entirely done in the context of a DoFn". Could you lend
> a hint?
>
> I assume you didn't mean for me to append to my pipeline a ParDo.of(new
> DoFn... that manually writes out to some file location, did you? That would
> lose all the benefits of the IO/Sink classes.
>
Which benefits? I think nearly everything that FileBasedSink does is
unnecessary for WholeFileIO.write() - I agree with Robert that it can be
implemented with a ParDo (I suppose he meant that WholeFileIO.write() can
be implemented by expanding it into a single ParDo).


>
> That said, I've found the "sharding" logic to be deeply embedded in all the
> FileBaseSink derivatives, and my attempt at sidestepping this logic isn't
> going very well. I managed to write a FileBasedSink that writes Byte[] out
> (and that works correctly), but figuring out how to get the FIlenamePolicy
> to be different for each element written out seems counter to the intent of
> much of these classes.
>
> Chris
>
> On Wed, Aug 2, 2017 at 10:23 AM, Reuven Lax 
> wrote:
>
> > On Wed, Aug 2, 2017 at 7:49 AM, Chris Hebert <
> > chris.hebert-...@digitalreasoning.com> wrote:
> >
> > > Thanks for the feedback!
> > >
> > > Aggregated thoughts:
> > >
> > >1. Warn users about large files (like 5GB large)
> > >
> >
> > I would set the threshold smaller. Also remember, that while you may
> warn,
> > some runners might simply fail to process the record causing pipelines to
> > either get stuck or fail all together.
> >
> >
> > >2. Filenames can stick with contents via PCollection > >contents>>
> > >3. InputStreams can't be encoded directly, but could be referenced
> in
> > a
> > >FileWrapper object
> > >4. Be mindful of sink race conditions with multiple workers; make
> sure
> > >failed workers cleanup incompletely written files
> > >5. File systems often do better with few large files than many small
> > > ones
> > >6. Most/all of this can be done in the context of a DoFn
> > >
> > > # Regarding point 1 and point 2
> > > Yes!
> > >
> > > # Regarding point 3:
> > >
> > > ## Approach A:
> > > When the FileWrapper is encoded, it must somehow encode a reference to
> > the
> > > InputStream it is associated with, so that when the FileWrapper is
> > decoded
> > > it can pick up that InputStream again. My Java knowledge isn't deep
> > enough
> > > to know how one would do that with hashcodes and object lookup and
> such,
> > > but I could sidestep that entirely by simply encoding the filepath with
> > the
> > > FileWrapper, then open up a new InputStream on that file path every
> time
> > > the FileWrapper is decoded.
> > >
> > > ## Approach B:
> > > An alternative to the above technique is to simply pass a byte[] array,
> > > like so:
> > > PCollection> fileNamesAndBytes = p.apply("Read",
> > > WholeFileIO.read().from("/path/to/input/dir/*"));
> > >
> > > That would solve the encoding problem, allow users to get whatever they
> > > want out of it with a ByteArrayInputStream, AND put a hard limit on the
> > > size of incoming files at just below 2 GB (if my math is right). (This
> is
> > > large enough for my use case, at present.)
> > >
> > >
> > > # Regarding point 4:
> > >
> > > Any examples or guidance I could pull from to protect against this
> > > properly?
> > >
> > >
> > > # Regarding point 5:
> > >
> > > TextIO can read and write with different compressions. Would it be
> > feasible
> > > for this WholeFileIO to read and write these many files to compressed
> zip
> > > files also? (I envision this as a stretch feature that would be added
> > after
> > > the initial iteration anyway.)
> > >
> > >
> > > # Regarding point 6:
> > >
> > > The only prebuilt IO thing I've found find in Beam that uses DoFn is
> > > WriteFiles. Do you have any examples to point towards to enlighten me
> on
> > > the use of DoFn in this context? Unfortunately, we all know the
> > "Authoring
> > > I/O Transforms" documentation is sparse.
> > >
> > >
> > > Keep it coming,
> > > Chris
> > >
> > > On Tue, Aug 1, 2017 at 3:55 PM, Robert Bradshaw
> > >  > > > wrote:
> > >
> > > > On Tue, Aug 1, 2017 at 1:42 PM, Eugene Kirpichov <
> > > > kirpic...@google.com.invalid> wrote:
> > > >
> > > > > Hi,
> > > > > As mentioned on the PR - I support the creation of such an IO (both
> > > read
> > > > > and write) with the caveats that Reuven mentioned; we can refine
> the
> > > > naming
> > > > > during code review.
> > > > > Note that you won't be able to create a PCollection
> > > because
> > > > > elements of a PCollection must have a coder and it's not possi

Re: Proposed API for a Whole File IO

2017-08-07 Thread Chris Hebert
Hello again,

I created tickets to capture these requests:
https://issues.apache.org/jira/browse/BEAM-2750
https://issues.apache.org/jira/browse/BEAM-2751

I've started working on the Write part.

Robert, after some time working on this, I'm unable to see how these
objectives can be "entirely done in the context of a DoFn". Could you lend
a hint?

I assume you didn't mean for me to append to my pipeline a ParDo.of(new
DoFn... that manually writes out to some file location, did you? That would
lose all the benefits of the IO/Sink classes.

That said, I've found the "sharding" logic to be deeply embedded in all the
FileBaseSink derivatives, and my attempt at sidestepping this logic isn't
going very well. I managed to write a FileBasedSink that writes Byte[] out
(and that works correctly), but figuring out how to get the FIlenamePolicy
to be different for each element written out seems counter to the intent of
much of these classes.

Chris

On Wed, Aug 2, 2017 at 10:23 AM, Reuven Lax 
wrote:

> On Wed, Aug 2, 2017 at 7:49 AM, Chris Hebert <
> chris.hebert-...@digitalreasoning.com> wrote:
>
> > Thanks for the feedback!
> >
> > Aggregated thoughts:
> >
> >1. Warn users about large files (like 5GB large)
> >
>
> I would set the threshold smaller. Also remember, that while you may warn,
> some runners might simply fail to process the record causing pipelines to
> either get stuck or fail all together.
>
>
> >2. Filenames can stick with contents via PCollection >contents>>
> >3. InputStreams can't be encoded directly, but could be referenced in
> a
> >FileWrapper object
> >4. Be mindful of sink race conditions with multiple workers; make sure
> >failed workers cleanup incompletely written files
> >5. File systems often do better with few large files than many small
> > ones
> >6. Most/all of this can be done in the context of a DoFn
> >
> > # Regarding point 1 and point 2
> > Yes!
> >
> > # Regarding point 3:
> >
> > ## Approach A:
> > When the FileWrapper is encoded, it must somehow encode a reference to
> the
> > InputStream it is associated with, so that when the FileWrapper is
> decoded
> > it can pick up that InputStream again. My Java knowledge isn't deep
> enough
> > to know how one would do that with hashcodes and object lookup and such,
> > but I could sidestep that entirely by simply encoding the filepath with
> the
> > FileWrapper, then open up a new InputStream on that file path every time
> > the FileWrapper is decoded.
> >
> > ## Approach B:
> > An alternative to the above technique is to simply pass a byte[] array,
> > like so:
> > PCollection> fileNamesAndBytes = p.apply("Read",
> > WholeFileIO.read().from("/path/to/input/dir/*"));
> >
> > That would solve the encoding problem, allow users to get whatever they
> > want out of it with a ByteArrayInputStream, AND put a hard limit on the
> > size of incoming files at just below 2 GB (if my math is right). (This is
> > large enough for my use case, at present.)
> >
> >
> > # Regarding point 4:
> >
> > Any examples or guidance I could pull from to protect against this
> > properly?
> >
> >
> > # Regarding point 5:
> >
> > TextIO can read and write with different compressions. Would it be
> feasible
> > for this WholeFileIO to read and write these many files to compressed zip
> > files also? (I envision this as a stretch feature that would be added
> after
> > the initial iteration anyway.)
> >
> >
> > # Regarding point 6:
> >
> > The only prebuilt IO thing I've found find in Beam that uses DoFn is
> > WriteFiles. Do you have any examples to point towards to enlighten me on
> > the use of DoFn in this context? Unfortunately, we all know the
> "Authoring
> > I/O Transforms" documentation is sparse.
> >
> >
> > Keep it coming,
> > Chris
> >
> > On Tue, Aug 1, 2017 at 3:55 PM, Robert Bradshaw
> >  > > wrote:
> >
> > > On Tue, Aug 1, 2017 at 1:42 PM, Eugene Kirpichov <
> > > kirpic...@google.com.invalid> wrote:
> > >
> > > > Hi,
> > > > As mentioned on the PR - I support the creation of such an IO (both
> > read
> > > > and write) with the caveats that Reuven mentioned; we can refine the
> > > naming
> > > > during code review.
> > > > Note that you won't be able to create a PCollection
> > because
> > > > elements of a PCollection must have a coder and it's not possible to
> > > > provide a coder for InputStream.
> > >
> > >
> > > Well, it's possible, but the fact that InputStream is mutable may cause
> > > issues (e.g. if there's fusion, or when estimating its size).
> > >
> > > I would probably let the API consume/produce a PCollection > > contents>>. Alternatively, a FileWrapper object of some kind could
> > provide
> > > accessors to InputStream (or otherwise facilitate lazy reading).
> > >
> > > Note for the sink one must take care there's no race in case multiple
> > > workers are attempting to process the same bundle (and ideally cleanup
> in
> > > the face of failure). Other than that, these could be entirely done

Re: Proposed API for a Whole File IO

2017-08-02 Thread Reuven Lax
On Wed, Aug 2, 2017 at 7:49 AM, Chris Hebert <
chris.hebert-...@digitalreasoning.com> wrote:

> Thanks for the feedback!
>
> Aggregated thoughts:
>
>1. Warn users about large files (like 5GB large)
>

I would set the threshold smaller. Also remember, that while you may warn,
some runners might simply fail to process the record causing pipelines to
either get stuck or fail all together.


>2. Filenames can stick with contents via PCollectioncontents>>
>3. InputStreams can't be encoded directly, but could be referenced in a
>FileWrapper object
>4. Be mindful of sink race conditions with multiple workers; make sure
>failed workers cleanup incompletely written files
>5. File systems often do better with few large files than many small
> ones
>6. Most/all of this can be done in the context of a DoFn
>
> # Regarding point 1 and point 2
> Yes!
>
> # Regarding point 3:
>
> ## Approach A:
> When the FileWrapper is encoded, it must somehow encode a reference to the
> InputStream it is associated with, so that when the FileWrapper is decoded
> it can pick up that InputStream again. My Java knowledge isn't deep enough
> to know how one would do that with hashcodes and object lookup and such,
> but I could sidestep that entirely by simply encoding the filepath with the
> FileWrapper, then open up a new InputStream on that file path every time
> the FileWrapper is decoded.
>
> ## Approach B:
> An alternative to the above technique is to simply pass a byte[] array,
> like so:
> PCollection> fileNamesAndBytes = p.apply("Read",
> WholeFileIO.read().from("/path/to/input/dir/*"));
>
> That would solve the encoding problem, allow users to get whatever they
> want out of it with a ByteArrayInputStream, AND put a hard limit on the
> size of incoming files at just below 2 GB (if my math is right). (This is
> large enough for my use case, at present.)
>
>
> # Regarding point 4:
>
> Any examples or guidance I could pull from to protect against this
> properly?
>
>
> # Regarding point 5:
>
> TextIO can read and write with different compressions. Would it be feasible
> for this WholeFileIO to read and write these many files to compressed zip
> files also? (I envision this as a stretch feature that would be added after
> the initial iteration anyway.)
>
>
> # Regarding point 6:
>
> The only prebuilt IO thing I've found find in Beam that uses DoFn is
> WriteFiles. Do you have any examples to point towards to enlighten me on
> the use of DoFn in this context? Unfortunately, we all know the "Authoring
> I/O Transforms" documentation is sparse.
>
>
> Keep it coming,
> Chris
>
> On Tue, Aug 1, 2017 at 3:55 PM, Robert Bradshaw
>  > wrote:
>
> > On Tue, Aug 1, 2017 at 1:42 PM, Eugene Kirpichov <
> > kirpic...@google.com.invalid> wrote:
> >
> > > Hi,
> > > As mentioned on the PR - I support the creation of such an IO (both
> read
> > > and write) with the caveats that Reuven mentioned; we can refine the
> > naming
> > > during code review.
> > > Note that you won't be able to create a PCollection
> because
> > > elements of a PCollection must have a coder and it's not possible to
> > > provide a coder for InputStream.
> >
> >
> > Well, it's possible, but the fact that InputStream is mutable may cause
> > issues (e.g. if there's fusion, or when estimating its size).
> >
> > I would probably let the API consume/produce a PCollection > contents>>. Alternatively, a FileWrapper object of some kind could
> provide
> > accessors to InputStream (or otherwise facilitate lazy reading).
> >
> > Note for the sink one must take care there's no race in case multiple
> > workers are attempting to process the same bundle (and ideally cleanup in
> > the face of failure). Other than that, these could be entirely done in
> the
> > context of a DoFn.
> >
> > Also not that most filesystems, especially distributed ones, do better
> > reading and writing fewer larger files than many, many small ones.
> >
> >
> > > On Tue, Aug 1, 2017 at 1:33 PM Reuven Lax 
> > > wrote:
> > >
> > > > One thing to keep in mind is that many runners might have issues with
> > > huge
> > > > elements. If you have a 5gb file, encoding it as a single element
> might
> > > > give you pain or might simply not work on some runners.
> > > >
> > > > Reuven
> > > >
> > > > On Tue, Aug 1, 2017 at 1:22 PM, Chris Hebert <
> > > > chris.hebert-...@digitalreasoning.com> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I'd like to:
> > > > >
> > > > >1. Read whole files as one input each. (If my input files are
> > > hi.txt,
> > > > >what.txt, and yes.txt, then the whole contents of hi.txt are an
> > > > element
> > > > > of
> > > > >the returned PCollection, the whole contents of what.txt are the
> > > next
> > > > >element, etc.)
> > > > >2. Write elements as individual files. (Rather than smashing
> > > thousands
> > > > >of outputs into a handful of files as TextIO does:
> > > > > output-0-of-5,
> > > > >output-1-of-

Re: Proposed API for a Whole File IO

2017-08-02 Thread Chris Hebert
Thanks for the feedback!

Aggregated thoughts:

   1. Warn users about large files (like 5GB large)
   2. Filenames can stick with contents via PCollection>
   3. InputStreams can't be encoded directly, but could be referenced in a
   FileWrapper object
   4. Be mindful of sink race conditions with multiple workers; make sure
   failed workers cleanup incompletely written files
   5. File systems often do better with few large files than many small ones
   6. Most/all of this can be done in the context of a DoFn

# Regarding point 1 and point 2
Yes!

# Regarding point 3:

## Approach A:
When the FileWrapper is encoded, it must somehow encode a reference to the
InputStream it is associated with, so that when the FileWrapper is decoded
it can pick up that InputStream again. My Java knowledge isn't deep enough
to know how one would do that with hashcodes and object lookup and such,
but I could sidestep that entirely by simply encoding the filepath with the
FileWrapper, then open up a new InputStream on that file path every time
the FileWrapper is decoded.

## Approach B:
An alternative to the above technique is to simply pass a byte[] array,
like so:
PCollection> fileNamesAndBytes = p.apply("Read",
WholeFileIO.read().from("/path/to/input/dir/*"));

That would solve the encoding problem, allow users to get whatever they
want out of it with a ByteArrayInputStream, AND put a hard limit on the
size of incoming files at just below 2 GB (if my math is right). (This is
large enough for my use case, at present.)


# Regarding point 4:

Any examples or guidance I could pull from to protect against this properly?


# Regarding point 5:

TextIO can read and write with different compressions. Would it be feasible
for this WholeFileIO to read and write these many files to compressed zip
files also? (I envision this as a stretch feature that would be added after
the initial iteration anyway.)


# Regarding point 6:

The only prebuilt IO thing I've found find in Beam that uses DoFn is
WriteFiles. Do you have any examples to point towards to enlighten me on
the use of DoFn in this context? Unfortunately, we all know the "Authoring
I/O Transforms" documentation is sparse.


Keep it coming,
Chris

On Tue, Aug 1, 2017 at 3:55 PM, Robert Bradshaw  wrote:

> On Tue, Aug 1, 2017 at 1:42 PM, Eugene Kirpichov <
> kirpic...@google.com.invalid> wrote:
>
> > Hi,
> > As mentioned on the PR - I support the creation of such an IO (both read
> > and write) with the caveats that Reuven mentioned; we can refine the
> naming
> > during code review.
> > Note that you won't be able to create a PCollection because
> > elements of a PCollection must have a coder and it's not possible to
> > provide a coder for InputStream.
>
>
> Well, it's possible, but the fact that InputStream is mutable may cause
> issues (e.g. if there's fusion, or when estimating its size).
>
> I would probably let the API consume/produce a PCollection contents>>. Alternatively, a FileWrapper object of some kind could provide
> accessors to InputStream (or otherwise facilitate lazy reading).
>
> Note for the sink one must take care there's no race in case multiple
> workers are attempting to process the same bundle (and ideally cleanup in
> the face of failure). Other than that, these could be entirely done in the
> context of a DoFn.
>
> Also not that most filesystems, especially distributed ones, do better
> reading and writing fewer larger files than many, many small ones.
>
>
> > On Tue, Aug 1, 2017 at 1:33 PM Reuven Lax 
> > wrote:
> >
> > > One thing to keep in mind is that many runners might have issues with
> > huge
> > > elements. If you have a 5gb file, encoding it as a single element might
> > > give you pain or might simply not work on some runners.
> > >
> > > Reuven
> > >
> > > On Tue, Aug 1, 2017 at 1:22 PM, Chris Hebert <
> > > chris.hebert-...@digitalreasoning.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > I'd like to:
> > > >
> > > >1. Read whole files as one input each. (If my input files are
> > hi.txt,
> > > >what.txt, and yes.txt, then the whole contents of hi.txt are an
> > > element
> > > > of
> > > >the returned PCollection, the whole contents of what.txt are the
> > next
> > > >element, etc.)
> > > >2. Write elements as individual files. (Rather than smashing
> > thousands
> > > >of outputs into a handful of files as TextIO does:
> > > > output-0-of-5,
> > > >output-1-of-5,..., I want to output each thing
> individually.
> > > So
> > > > if
> > > >I'm given hi.txt, what.txt, yes.txt then I'd like to read those in
> > as
> > > > whole
> > > >files individually, then write out my processed results as
> > > > hi.txt-modified,
> > > >what.txt-modified, yes.txt-modified.).
> > > >
> > > > Before reading on, if you have easier ways to do these things, then
> I'd
> > > > love to hear them!
> > > >
> > > > # Part 1
> > > >
> > > > I attempted Part 1 with this PR:
> > > > https://github.com/apache/beam/p

Re: Proposed API for a Whole File IO

2017-08-01 Thread Robert Bradshaw
On Tue, Aug 1, 2017 at 1:42 PM, Eugene Kirpichov <
kirpic...@google.com.invalid> wrote:

> Hi,
> As mentioned on the PR - I support the creation of such an IO (both read
> and write) with the caveats that Reuven mentioned; we can refine the naming
> during code review.
> Note that you won't be able to create a PCollection because
> elements of a PCollection must have a coder and it's not possible to
> provide a coder for InputStream.


Well, it's possible, but the fact that InputStream is mutable may cause
issues (e.g. if there's fusion, or when estimating its size).

I would probably let the API consume/produce a PCollection>. Alternatively, a FileWrapper object of some kind could provide
accessors to InputStream (or otherwise facilitate lazy reading).

Note for the sink one must take care there's no race in case multiple
workers are attempting to process the same bundle (and ideally cleanup in
the face of failure). Other than that, these could be entirely done in the
context of a DoFn.

Also not that most filesystems, especially distributed ones, do better
reading and writing fewer larger files than many, many small ones.


> On Tue, Aug 1, 2017 at 1:33 PM Reuven Lax 
> wrote:
>
> > One thing to keep in mind is that many runners might have issues with
> huge
> > elements. If you have a 5gb file, encoding it as a single element might
> > give you pain or might simply not work on some runners.
> >
> > Reuven
> >
> > On Tue, Aug 1, 2017 at 1:22 PM, Chris Hebert <
> > chris.hebert-...@digitalreasoning.com> wrote:
> >
> > > Hi,
> > >
> > > I'd like to:
> > >
> > >1. Read whole files as one input each. (If my input files are
> hi.txt,
> > >what.txt, and yes.txt, then the whole contents of hi.txt are an
> > element
> > > of
> > >the returned PCollection, the whole contents of what.txt are the
> next
> > >element, etc.)
> > >2. Write elements as individual files. (Rather than smashing
> thousands
> > >of outputs into a handful of files as TextIO does:
> > > output-0-of-5,
> > >output-1-of-5,..., I want to output each thing individually.
> > So
> > > if
> > >I'm given hi.txt, what.txt, yes.txt then I'd like to read those in
> as
> > > whole
> > >files individually, then write out my processed results as
> > > hi.txt-modified,
> > >what.txt-modified, yes.txt-modified.).
> > >
> > > Before reading on, if you have easier ways to do these things, then I'd
> > > love to hear them!
> > >
> > > # Part 1
> > >
> > > I attempted Part 1 with this PR:
> > > https://github.com/apache/beam/pull/3543
> > >
> > > But I overgeneralized.
> > >
> > > Specifically, I want:
> > >
> > > Pipeline p = Pipeline.create(options);
> > > PCollection wholeFilesAsStrings = p.apply("Read Whole Files
> from
> > > Input Directory", WholeFileIO.read().from("/path/to/input/dir/*"));
> > >
> > > or
> > >
> > > Pipeline p = Pipeline.create(options);
> > > PCollection wholeFileStreams = p.apply("Read Whole Files
> > from
> > > Input Directory", WholeFileIO.read().from("/path/to/input/dir/*"));
> > >
> > > Bonus points would include:
> > >
> > >- Keeping the filename somehow
> > >
> > >
> > > # Part 2
> > >
> > > Currently, if you output hundreds of thousands of elements (batch-mode)
> > > with TextIO in Beam on Flink with, say, 45 TaskManagers, then you get
> > > output-0-of-00045, output-1-of-00045, etc., and each one of
> those
> > > files contain tens of thousands of outputs back to back. I want them to
> > be
> > > output as individual files.
> > >
> > > If appended after code snippets from Part 1, it would look like:
> > >
> > > ...
> > > p.apply("Write Whole File Outputs",
> > > WholeFileIO.write().to("/path/to/output/dir/"));
> > >
> > > Bonus points would include:
> > >
> > >- Writing each element of the given PCollection to the filename
> they'd
> > >like to go to.
> > >- Parallelizable. (This might already be done, I just noticed that
> my
> > >Beam+Flink+YARN pipeline with TextIO.write() only had one
> TaskManager
> > >writing the DataSink output even though all other components of my
> > > pipeline
> > >had many TaskManagers working on them simultaneously. I haven't
> found
> > > the
> > >way to fix that yet. The current arrangement added 15 minutes to the
> > > end of
> > >my pipeline as the lonely TaskManager did all the output.)
> > >
> > >
> > > I'm available to put the dev work into this. (Actually, I'm putting dev
> > > time into some kind of solution whether this is agreed upon or not :).
> > >
> > > Feedback, please,
> > > Chris
> > >
> >
>


Re: Proposed API for a Whole File IO

2017-08-01 Thread Eugene Kirpichov
Hi,
As mentioned on the PR - I support the creation of such an IO (both read
and write) with the caveats that Reuven mentioned; we can refine the naming
during code review.
Note that you won't be able to create a PCollection because
elements of a PCollection must have a coder and it's not possible to
provide a coder for InputStream.

On Tue, Aug 1, 2017 at 1:33 PM Reuven Lax  wrote:

> One thing to keep in mind is that many runners might have issues with huge
> elements. If you have a 5gb file, encoding it as a single element might
> give you pain or might simply not work on some runners.
>
> Reuven
>
> On Tue, Aug 1, 2017 at 1:22 PM, Chris Hebert <
> chris.hebert-...@digitalreasoning.com> wrote:
>
> > Hi,
> >
> > I'd like to:
> >
> >1. Read whole files as one input each. (If my input files are hi.txt,
> >what.txt, and yes.txt, then the whole contents of hi.txt are an
> element
> > of
> >the returned PCollection, the whole contents of what.txt are the next
> >element, etc.)
> >2. Write elements as individual files. (Rather than smashing thousands
> >of outputs into a handful of files as TextIO does:
> > output-0-of-5,
> >output-1-of-5,..., I want to output each thing individually.
> So
> > if
> >I'm given hi.txt, what.txt, yes.txt then I'd like to read those in as
> > whole
> >files individually, then write out my processed results as
> > hi.txt-modified,
> >what.txt-modified, yes.txt-modified.).
> >
> > Before reading on, if you have easier ways to do these things, then I'd
> > love to hear them!
> >
> > # Part 1
> >
> > I attempted Part 1 with this PR:
> > https://github.com/apache/beam/pull/3543
> >
> > But I overgeneralized.
> >
> > Specifically, I want:
> >
> > Pipeline p = Pipeline.create(options);
> > PCollection wholeFilesAsStrings = p.apply("Read Whole Files from
> > Input Directory", WholeFileIO.read().from("/path/to/input/dir/*"));
> >
> > or
> >
> > Pipeline p = Pipeline.create(options);
> > PCollection wholeFileStreams = p.apply("Read Whole Files
> from
> > Input Directory", WholeFileIO.read().from("/path/to/input/dir/*"));
> >
> > Bonus points would include:
> >
> >- Keeping the filename somehow
> >
> >
> > # Part 2
> >
> > Currently, if you output hundreds of thousands of elements (batch-mode)
> > with TextIO in Beam on Flink with, say, 45 TaskManagers, then you get
> > output-0-of-00045, output-1-of-00045, etc., and each one of those
> > files contain tens of thousands of outputs back to back. I want them to
> be
> > output as individual files.
> >
> > If appended after code snippets from Part 1, it would look like:
> >
> > ...
> > p.apply("Write Whole File Outputs",
> > WholeFileIO.write().to("/path/to/output/dir/"));
> >
> > Bonus points would include:
> >
> >- Writing each element of the given PCollection to the filename they'd
> >like to go to.
> >- Parallelizable. (This might already be done, I just noticed that my
> >Beam+Flink+YARN pipeline with TextIO.write() only had one TaskManager
> >writing the DataSink output even though all other components of my
> > pipeline
> >had many TaskManagers working on them simultaneously. I haven't found
> > the
> >way to fix that yet. The current arrangement added 15 minutes to the
> > end of
> >my pipeline as the lonely TaskManager did all the output.)
> >
> >
> > I'm available to put the dev work into this. (Actually, I'm putting dev
> > time into some kind of solution whether this is agreed upon or not :).
> >
> > Feedback, please,
> > Chris
> >
>


Re: Proposed API for a Whole File IO

2017-08-01 Thread Reuven Lax
One thing to keep in mind is that many runners might have issues with huge
elements. If you have a 5gb file, encoding it as a single element might
give you pain or might simply not work on some runners.

Reuven

On Tue, Aug 1, 2017 at 1:22 PM, Chris Hebert <
chris.hebert-...@digitalreasoning.com> wrote:

> Hi,
>
> I'd like to:
>
>1. Read whole files as one input each. (If my input files are hi.txt,
>what.txt, and yes.txt, then the whole contents of hi.txt are an element
> of
>the returned PCollection, the whole contents of what.txt are the next
>element, etc.)
>2. Write elements as individual files. (Rather than smashing thousands
>of outputs into a handful of files as TextIO does:
> output-0-of-5,
>output-1-of-5,..., I want to output each thing individually. So
> if
>I'm given hi.txt, what.txt, yes.txt then I'd like to read those in as
> whole
>files individually, then write out my processed results as
> hi.txt-modified,
>what.txt-modified, yes.txt-modified.).
>
> Before reading on, if you have easier ways to do these things, then I'd
> love to hear them!
>
> # Part 1
>
> I attempted Part 1 with this PR:
> https://github.com/apache/beam/pull/3543
>
> But I overgeneralized.
>
> Specifically, I want:
>
> Pipeline p = Pipeline.create(options);
> PCollection wholeFilesAsStrings = p.apply("Read Whole Files from
> Input Directory", WholeFileIO.read().from("/path/to/input/dir/*"));
>
> or
>
> Pipeline p = Pipeline.create(options);
> PCollection wholeFileStreams = p.apply("Read Whole Files from
> Input Directory", WholeFileIO.read().from("/path/to/input/dir/*"));
>
> Bonus points would include:
>
>- Keeping the filename somehow
>
>
> # Part 2
>
> Currently, if you output hundreds of thousands of elements (batch-mode)
> with TextIO in Beam on Flink with, say, 45 TaskManagers, then you get
> output-0-of-00045, output-1-of-00045, etc., and each one of those
> files contain tens of thousands of outputs back to back. I want them to be
> output as individual files.
>
> If appended after code snippets from Part 1, it would look like:
>
> ...
> p.apply("Write Whole File Outputs",
> WholeFileIO.write().to("/path/to/output/dir/"));
>
> Bonus points would include:
>
>- Writing each element of the given PCollection to the filename they'd
>like to go to.
>- Parallelizable. (This might already be done, I just noticed that my
>Beam+Flink+YARN pipeline with TextIO.write() only had one TaskManager
>writing the DataSink output even though all other components of my
> pipeline
>had many TaskManagers working on them simultaneously. I haven't found
> the
>way to fix that yet. The current arrangement added 15 minutes to the
> end of
>my pipeline as the lonely TaskManager did all the output.)
>
>
> I'm available to put the dev work into this. (Actually, I'm putting dev
> time into some kind of solution whether this is agreed upon or not :).
>
> Feedback, please,
> Chris
>