Re: Filesystems.copy and .rename behavior

2018-02-02 Thread Robert Bradshaw
Pipeline stages must be retry-tolerant. E.g. the VM it's running on
might get shut down. We should not be failing jobs in this case.

It seems the current implementation could only produce bad results if
(1) unrelated output files already existed and (2) the temporary files
were either not written or deleted out-of-band. (2) seems really
unlikely, but can be eliminated if we ensure that (1) cannot happen
(e.g. deleting destination files before starting the rename).

On Fri, Feb 2, 2018 at 12:13 PM, Reuven Lax  wrote:
>
>
> On Fri, Feb 2, 2018 at 11:17 AM, Chamikara Jayalath 
> wrote:
>>
>> Currently, Python file-based sink is batch only.
>
>
> Sure, but that won't be true forever.
>
>>
>>
>> Regarding Raghu's question, stage/pipeline failure should not be
>> considered as a data loss but I prefer overriding existing output and
>> completing a possibly expensive pipeline over failing the whole pipeline due
>> to one or more existing files.
>>
>> - Cham
>>
>>
>> On Fri, Feb 2, 2018 at 10:21 AM Reuven Lax  wrote:
>>>
>>> However this code might run in streaming as well, right?
>>>
>>> On Fri, Feb 2, 2018 at 9:54 AM, Raghu Angadi  wrote:

 In a batch pipeline, is it considered a data loss if the the stage fails
 (assuming it does not set IGNORE_MISSING_FILES and fails hard)? If not, it
 might be better to favor correctness and fail in current implementation.


 On Thu, Feb 1, 2018 at 4:07 PM, Robert Bradshaw 
 wrote:
>
> You could add a step to delete all of dest before a barrier and the
> step that does the rename as outlined. In that case, any dest file
> that exists must be good.
>
> On Thu, Feb 1, 2018 at 2:52 PM, Eugene Kirpichov 
> wrote:
> > I think this is still unsafe in case exists(dst) (e.g. this is a
> > re-run of a
> > pipeline) but src is missing due to some bad reason. However it's
> > probably
> > better than what we have (e.g. we currently certainly don't perform
> > checksum
> > checks).
> >
> > On Thu, Feb 1, 2018 at 2:45 PM Udi Meiri  wrote:
> >>
> >> For GCS, I would do what I believe we already do.
> >> rename(src, dst):
> >> - if !exists(src) and exists(dst) return 0
> >> - if !exists(src) and !exists(dst) return error
> >> - if exists(src) and exists(dst) { if checksum(src) == checksum(dst)
> >> return 0 else delete(dst) }
> >> - Start a GCS copy from src to dst.
> >> - Wait for GCS copy to complete.
> >> - delete(src)
> >>
> >> For filesystems that don't have checksum() metadata, size() can be
> >> used
> >> instead.
> >>
> >> I've opened a bug to track this:
> >> https://issues.apache.org/jira/browse/BEAM-3600
> >>
> >> On Thu, Feb 1, 2018 at 2:25 PM Eugene Kirpichov
> >> 
> >> wrote:
> >>>
> >>> Yes, IGNORE_MISSING_FILES is unsafe because it will - well - ignore
> >>> files
> >>> that are missing for more ominous reasons than just this being a
> >>> non-first
> >>> attempt at renaming src to dst. E.g. if there was a bug in
> >>> constructing the
> >>> filename to be renamed, or if we somehow messed up the order of
> >>> rename vs
> >>> cleanup, etc. - these situations with IGNORE_MISSING_FILES would
> >>> lead to
> >>> silent data loss (likely caught by unit tests though - so this is
> >>> not a
> >>> super serious issue).
> >>>
> >>> Basically I just can't think of a case when I was copying files and
> >>> thinking "oh man, I wish it didn't give an error if the stuff I'm
> >>> copying
> >>> doesn't exist" - the option exists only because we couldn't come up
> >>> with
> >>> another way to implement idempotent rename on GCS.
> >>>
> >>> What's your idea of how a safe retryable GCS rename() could work?
> >>>
> >>> On Wed, Jan 31, 2018 at 6:45 PM Udi Meiri  wrote:
> 
>  Eugene, if I get this right, you're saying that
>  IGNORE_MISSING_FILES is
>  unsafe because it will skip (src, dst) pairs where neither exist?
>  (it only
>  looks if src exists)
> 
>  For GCS, we can construct a safe retryable rename() operation,
>  assuming
>  that copy() and delete() are atomic for a single file or pair of
>  files.
> 
> 
> 
>  On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi 
>  wrote:
> >
> > On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov
> >  wrote:
> >>
> >> As far as I know, the current implementation of file sinks is
> >> the only
> >> reason why the flag IGNORE_MISSING for copying even exists -
> >> there's no

Re: Filesystems.copy and .rename behavior

2018-02-02 Thread Reuven Lax
On Fri, Feb 2, 2018 at 11:17 AM, Chamikara Jayalath 
wrote:

> Currently, Python file-based sink is batch only.
>

Sure, but that won't be true forever.


>
> Regarding Raghu's question, stage/pipeline failure should not be
> considered as a data loss but I prefer overriding existing output and
> completing a possibly expensive pipeline over failing the whole pipeline
> due to one or more existing files.
>
> - Cham
>
>
> On Fri, Feb 2, 2018 at 10:21 AM Reuven Lax  wrote:
>
>> However this code might run in streaming as well, right?
>>
>> On Fri, Feb 2, 2018 at 9:54 AM, Raghu Angadi  wrote:
>>
>>> In a batch pipeline, is it considered a data loss if the the stage fails
>>> (assuming it does not set IGNORE_MISSING_FILES and fails hard)? If not, it
>>> might be better to favor correctness and fail in current implementation.
>>>
>>>
>>> On Thu, Feb 1, 2018 at 4:07 PM, Robert Bradshaw 
>>> wrote:
>>>
 You could add a step to delete all of dest before a barrier and the
 step that does the rename as outlined. In that case, any dest file
 that exists must be good.

 On Thu, Feb 1, 2018 at 2:52 PM, Eugene Kirpichov 
 wrote:
 > I think this is still unsafe in case exists(dst) (e.g. this is a
 re-run of a
 > pipeline) but src is missing due to some bad reason. However it's
 probably
 > better than what we have (e.g. we currently certainly don't perform
 checksum
 > checks).
 >
 > On Thu, Feb 1, 2018 at 2:45 PM Udi Meiri  wrote:
 >>
 >> For GCS, I would do what I believe we already do.
 >> rename(src, dst):
 >> - if !exists(src) and exists(dst) return 0
 >> - if !exists(src) and !exists(dst) return error
 >> - if exists(src) and exists(dst) { if checksum(src) == checksum(dst)
 >> return 0 else delete(dst) }
 >> - Start a GCS copy from src to dst.
 >> - Wait for GCS copy to complete.
 >> - delete(src)
 >>
 >> For filesystems that don't have checksum() metadata, size() can be
 used
 >> instead.
 >>
 >> I've opened a bug to track this:
 >> https://issues.apache.org/jira/browse/BEAM-3600
 >>
 >> On Thu, Feb 1, 2018 at 2:25 PM Eugene Kirpichov <
 kirpic...@google.com>
 >> wrote:
 >>>
 >>> Yes, IGNORE_MISSING_FILES is unsafe because it will - well - ignore
 files
 >>> that are missing for more ominous reasons than just this being a
 non-first
 >>> attempt at renaming src to dst. E.g. if there was a bug in
 constructing the
 >>> filename to be renamed, or if we somehow messed up the order of
 rename vs
 >>> cleanup, etc. - these situations with IGNORE_MISSING_FILES would
 lead to
 >>> silent data loss (likely caught by unit tests though - so this is
 not a
 >>> super serious issue).
 >>>
 >>> Basically I just can't think of a case when I was copying files and
 >>> thinking "oh man, I wish it didn't give an error if the stuff I'm
 copying
 >>> doesn't exist" - the option exists only because we couldn't come up
 with
 >>> another way to implement idempotent rename on GCS.
 >>>
 >>> What's your idea of how a safe retryable GCS rename() could work?
 >>>
 >>> On Wed, Jan 31, 2018 at 6:45 PM Udi Meiri  wrote:
 
  Eugene, if I get this right, you're saying that
 IGNORE_MISSING_FILES is
  unsafe because it will skip (src, dst) pairs where neither exist?
 (it only
  looks if src exists)
 
  For GCS, we can construct a safe retryable rename() operation,
 assuming
  that copy() and delete() are atomic for a single file or pair of
 files.
 
 
 
  On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi 
 wrote:
 >
 > On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov
 >  wrote:
 >>
 >> As far as I know, the current implementation of file sinks is
 the only
 >> reason why the flag IGNORE_MISSING for copying even exists -
 there's no
 >> other compelling reason to justify it. We implement "rename" as
 "copy, then
 >> delete" (in a single DoFn), so for idempodency of this operation
 we need to
 >> ignore the copying of a non-existent file.
 >>
 >> I think the right way to go would be to change the
 implementation of
 >> renaming to have a @RequiresStableInput (or reshuffle) in the
 middle, so
 >> it's made of 2 individually idempotent operations:
 >> 1) copy, which would fail if input is missing, and would
 overwrite
 >> output if it exists
 >> -- reshuffle --
 >> 2) delete, which would not fail if input is missing.
 >
 >
 > Something like this 

Re: Filesystems.copy and .rename behavior

2018-02-02 Thread Raghu Angadi
On Fri, Feb 2, 2018 at 10:21 AM, Reuven Lax  wrote:

> However this code might run in streaming as well, right?
>

True. What is the best practices recommendation to handle it? Probably the
author of the sink transform should consider the context and decide if
needs to be retry tolerant while setting the transform. I think the current
behavior of not overwriting the output would be very surprising to the
unsuspecting users.


>
> On Fri, Feb 2, 2018 at 9:54 AM, Raghu Angadi  wrote:
>
>> In a batch pipeline, is it considered a data loss if the the stage fails
>> (assuming it does not set IGNORE_MISSING_FILES and fails hard)? If not, it
>> might be better to favor correctness and fail in current implementation.
>>
>>
>> On Thu, Feb 1, 2018 at 4:07 PM, Robert Bradshaw 
>> wrote:
>>
>>> You could add a step to delete all of dest before a barrier and the
>>> step that does the rename as outlined. In that case, any dest file
>>> that exists must be good.
>>>
>>> On Thu, Feb 1, 2018 at 2:52 PM, Eugene Kirpichov 
>>> wrote:
>>> > I think this is still unsafe in case exists(dst) (e.g. this is a
>>> re-run of a
>>> > pipeline) but src is missing due to some bad reason. However it's
>>> probably
>>> > better than what we have (e.g. we currently certainly don't perform
>>> checksum
>>> > checks).
>>> >
>>> > On Thu, Feb 1, 2018 at 2:45 PM Udi Meiri  wrote:
>>> >>
>>> >> For GCS, I would do what I believe we already do.
>>> >> rename(src, dst):
>>> >> - if !exists(src) and exists(dst) return 0
>>> >> - if !exists(src) and !exists(dst) return error
>>> >> - if exists(src) and exists(dst) { if checksum(src) == checksum(dst)
>>> >> return 0 else delete(dst) }
>>> >> - Start a GCS copy from src to dst.
>>> >> - Wait for GCS copy to complete.
>>> >> - delete(src)
>>> >>
>>> >> For filesystems that don't have checksum() metadata, size() can be
>>> used
>>> >> instead.
>>> >>
>>> >> I've opened a bug to track this:
>>> >> https://issues.apache.org/jira/browse/BEAM-3600
>>> >>
>>> >> On Thu, Feb 1, 2018 at 2:25 PM Eugene Kirpichov >> >
>>> >> wrote:
>>> >>>
>>> >>> Yes, IGNORE_MISSING_FILES is unsafe because it will - well - ignore
>>> files
>>> >>> that are missing for more ominous reasons than just this being a
>>> non-first
>>> >>> attempt at renaming src to dst. E.g. if there was a bug in
>>> constructing the
>>> >>> filename to be renamed, or if we somehow messed up the order of
>>> rename vs
>>> >>> cleanup, etc. - these situations with IGNORE_MISSING_FILES would
>>> lead to
>>> >>> silent data loss (likely caught by unit tests though - so this is
>>> not a
>>> >>> super serious issue).
>>> >>>
>>> >>> Basically I just can't think of a case when I was copying files and
>>> >>> thinking "oh man, I wish it didn't give an error if the stuff I'm
>>> copying
>>> >>> doesn't exist" - the option exists only because we couldn't come up
>>> with
>>> >>> another way to implement idempotent rename on GCS.
>>> >>>
>>> >>> What's your idea of how a safe retryable GCS rename() could work?
>>> >>>
>>> >>> On Wed, Jan 31, 2018 at 6:45 PM Udi Meiri  wrote:
>>> 
>>>  Eugene, if I get this right, you're saying that
>>> IGNORE_MISSING_FILES is
>>>  unsafe because it will skip (src, dst) pairs where neither exist?
>>> (it only
>>>  looks if src exists)
>>> 
>>>  For GCS, we can construct a safe retryable rename() operation,
>>> assuming
>>>  that copy() and delete() are atomic for a single file or pair of
>>> files.
>>> 
>>> 
>>> 
>>>  On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi 
>>> wrote:
>>> >
>>> > On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov
>>> >  wrote:
>>> >>
>>> >> As far as I know, the current implementation of file sinks is the
>>> only
>>> >> reason why the flag IGNORE_MISSING for copying even exists -
>>> there's no
>>> >> other compelling reason to justify it. We implement "rename" as
>>> "copy, then
>>> >> delete" (in a single DoFn), so for idempodency of this operation
>>> we need to
>>> >> ignore the copying of a non-existent file.
>>> >>
>>> >> I think the right way to go would be to change the implementation
>>> of
>>> >> renaming to have a @RequiresStableInput (or reshuffle) in the
>>> middle, so
>>> >> it's made of 2 individually idempotent operations:
>>> >> 1) copy, which would fail if input is missing, and would overwrite
>>> >> output if it exists
>>> >> -- reshuffle --
>>> >> 2) delete, which would not fail if input is missing.
>>> >
>>> >
>>> > Something like this is needed only in streaming, right?
>>> >
>>> > Raghu.
>>> >
>>> >>
>>> >> That way first everything is copied (possibly via multiple
>>> attempts),
>>> >> and then old files are deleted (possibly via multiple attempts).
>>> 

Re: Filesystems.copy and .rename behavior

2018-02-02 Thread Chamikara Jayalath
Currently, Python file-based sink is batch only.

Regarding Raghu's question, stage/pipeline failure should not be considered
as a data loss but I prefer overriding existing output and completing a
possibly expensive pipeline over failing the whole pipeline due to one or
more existing files.

- Cham

On Fri, Feb 2, 2018 at 10:21 AM Reuven Lax  wrote:

> However this code might run in streaming as well, right?
>
> On Fri, Feb 2, 2018 at 9:54 AM, Raghu Angadi  wrote:
>
>> In a batch pipeline, is it considered a data loss if the the stage fails
>> (assuming it does not set IGNORE_MISSING_FILES and fails hard)? If not, it
>> might be better to favor correctness and fail in current implementation.
>>
>>
>> On Thu, Feb 1, 2018 at 4:07 PM, Robert Bradshaw 
>> wrote:
>>
>>> You could add a step to delete all of dest before a barrier and the
>>> step that does the rename as outlined. In that case, any dest file
>>> that exists must be good.
>>>
>>> On Thu, Feb 1, 2018 at 2:52 PM, Eugene Kirpichov 
>>> wrote:
>>> > I think this is still unsafe in case exists(dst) (e.g. this is a
>>> re-run of a
>>> > pipeline) but src is missing due to some bad reason. However it's
>>> probably
>>> > better than what we have (e.g. we currently certainly don't perform
>>> checksum
>>> > checks).
>>> >
>>> > On Thu, Feb 1, 2018 at 2:45 PM Udi Meiri  wrote:
>>> >>
>>> >> For GCS, I would do what I believe we already do.
>>> >> rename(src, dst):
>>> >> - if !exists(src) and exists(dst) return 0
>>> >> - if !exists(src) and !exists(dst) return error
>>> >> - if exists(src) and exists(dst) { if checksum(src) == checksum(dst)
>>> >> return 0 else delete(dst) }
>>> >> - Start a GCS copy from src to dst.
>>> >> - Wait for GCS copy to complete.
>>> >> - delete(src)
>>> >>
>>> >> For filesystems that don't have checksum() metadata, size() can be
>>> used
>>> >> instead.
>>> >>
>>> >> I've opened a bug to track this:
>>> >> https://issues.apache.org/jira/browse/BEAM-3600
>>> >>
>>> >> On Thu, Feb 1, 2018 at 2:25 PM Eugene Kirpichov >> >
>>> >> wrote:
>>> >>>
>>> >>> Yes, IGNORE_MISSING_FILES is unsafe because it will - well - ignore
>>> files
>>> >>> that are missing for more ominous reasons than just this being a
>>> non-first
>>> >>> attempt at renaming src to dst. E.g. if there was a bug in
>>> constructing the
>>> >>> filename to be renamed, or if we somehow messed up the order of
>>> rename vs
>>> >>> cleanup, etc. - these situations with IGNORE_MISSING_FILES would
>>> lead to
>>> >>> silent data loss (likely caught by unit tests though - so this is
>>> not a
>>> >>> super serious issue).
>>> >>>
>>> >>> Basically I just can't think of a case when I was copying files and
>>> >>> thinking "oh man, I wish it didn't give an error if the stuff I'm
>>> copying
>>> >>> doesn't exist" - the option exists only because we couldn't come up
>>> with
>>> >>> another way to implement idempotent rename on GCS.
>>> >>>
>>> >>> What's your idea of how a safe retryable GCS rename() could work?
>>> >>>
>>> >>> On Wed, Jan 31, 2018 at 6:45 PM Udi Meiri  wrote:
>>> 
>>>  Eugene, if I get this right, you're saying that
>>> IGNORE_MISSING_FILES is
>>>  unsafe because it will skip (src, dst) pairs where neither exist?
>>> (it only
>>>  looks if src exists)
>>> 
>>>  For GCS, we can construct a safe retryable rename() operation,
>>> assuming
>>>  that copy() and delete() are atomic for a single file or pair of
>>> files.
>>> 
>>> 
>>> 
>>>  On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi 
>>> wrote:
>>> >
>>> > On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov
>>> >  wrote:
>>> >>
>>> >> As far as I know, the current implementation of file sinks is the
>>> only
>>> >> reason why the flag IGNORE_MISSING for copying even exists -
>>> there's no
>>> >> other compelling reason to justify it. We implement "rename" as
>>> "copy, then
>>> >> delete" (in a single DoFn), so for idempodency of this operation
>>> we need to
>>> >> ignore the copying of a non-existent file.
>>> >>
>>> >> I think the right way to go would be to change the implementation
>>> of
>>> >> renaming to have a @RequiresStableInput (or reshuffle) in the
>>> middle, so
>>> >> it's made of 2 individually idempotent operations:
>>> >> 1) copy, which would fail if input is missing, and would overwrite
>>> >> output if it exists
>>> >> -- reshuffle --
>>> >> 2) delete, which would not fail if input is missing.
>>> >
>>> >
>>> > Something like this is needed only in streaming, right?
>>> >
>>> > Raghu.
>>> >
>>> >>
>>> >> That way first everything is copied (possibly via multiple
>>> attempts),
>>> >> and then old files are deleted (possibly via multiple attempts).
>>> >>
>>> 

Re: Filesystems.copy and .rename behavior

2018-02-02 Thread Reuven Lax
However this code might run in streaming as well, right?

On Fri, Feb 2, 2018 at 9:54 AM, Raghu Angadi  wrote:

> In a batch pipeline, is it considered a data loss if the the stage fails
> (assuming it does not set IGNORE_MISSING_FILES and fails hard)? If not, it
> might be better to favor correctness and fail in current implementation.
>
>
> On Thu, Feb 1, 2018 at 4:07 PM, Robert Bradshaw 
> wrote:
>
>> You could add a step to delete all of dest before a barrier and the
>> step that does the rename as outlined. In that case, any dest file
>> that exists must be good.
>>
>> On Thu, Feb 1, 2018 at 2:52 PM, Eugene Kirpichov 
>> wrote:
>> > I think this is still unsafe in case exists(dst) (e.g. this is a re-run
>> of a
>> > pipeline) but src is missing due to some bad reason. However it's
>> probably
>> > better than what we have (e.g. we currently certainly don't perform
>> checksum
>> > checks).
>> >
>> > On Thu, Feb 1, 2018 at 2:45 PM Udi Meiri  wrote:
>> >>
>> >> For GCS, I would do what I believe we already do.
>> >> rename(src, dst):
>> >> - if !exists(src) and exists(dst) return 0
>> >> - if !exists(src) and !exists(dst) return error
>> >> - if exists(src) and exists(dst) { if checksum(src) == checksum(dst)
>> >> return 0 else delete(dst) }
>> >> - Start a GCS copy from src to dst.
>> >> - Wait for GCS copy to complete.
>> >> - delete(src)
>> >>
>> >> For filesystems that don't have checksum() metadata, size() can be used
>> >> instead.
>> >>
>> >> I've opened a bug to track this:
>> >> https://issues.apache.org/jira/browse/BEAM-3600
>> >>
>> >> On Thu, Feb 1, 2018 at 2:25 PM Eugene Kirpichov 
>> >> wrote:
>> >>>
>> >>> Yes, IGNORE_MISSING_FILES is unsafe because it will - well - ignore
>> files
>> >>> that are missing for more ominous reasons than just this being a
>> non-first
>> >>> attempt at renaming src to dst. E.g. if there was a bug in
>> constructing the
>> >>> filename to be renamed, or if we somehow messed up the order of
>> rename vs
>> >>> cleanup, etc. - these situations with IGNORE_MISSING_FILES would lead
>> to
>> >>> silent data loss (likely caught by unit tests though - so this is not
>> a
>> >>> super serious issue).
>> >>>
>> >>> Basically I just can't think of a case when I was copying files and
>> >>> thinking "oh man, I wish it didn't give an error if the stuff I'm
>> copying
>> >>> doesn't exist" - the option exists only because we couldn't come up
>> with
>> >>> another way to implement idempotent rename on GCS.
>> >>>
>> >>> What's your idea of how a safe retryable GCS rename() could work?
>> >>>
>> >>> On Wed, Jan 31, 2018 at 6:45 PM Udi Meiri  wrote:
>> 
>>  Eugene, if I get this right, you're saying that IGNORE_MISSING_FILES
>> is
>>  unsafe because it will skip (src, dst) pairs where neither exist?
>> (it only
>>  looks if src exists)
>> 
>>  For GCS, we can construct a safe retryable rename() operation,
>> assuming
>>  that copy() and delete() are atomic for a single file or pair of
>> files.
>> 
>> 
>> 
>>  On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi 
>> wrote:
>> >
>> > On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov
>> >  wrote:
>> >>
>> >> As far as I know, the current implementation of file sinks is the
>> only
>> >> reason why the flag IGNORE_MISSING for copying even exists -
>> there's no
>> >> other compelling reason to justify it. We implement "rename" as
>> "copy, then
>> >> delete" (in a single DoFn), so for idempodency of this operation
>> we need to
>> >> ignore the copying of a non-existent file.
>> >>
>> >> I think the right way to go would be to change the implementation
>> of
>> >> renaming to have a @RequiresStableInput (or reshuffle) in the
>> middle, so
>> >> it's made of 2 individually idempotent operations:
>> >> 1) copy, which would fail if input is missing, and would overwrite
>> >> output if it exists
>> >> -- reshuffle --
>> >> 2) delete, which would not fail if input is missing.
>> >
>> >
>> > Something like this is needed only in streaming, right?
>> >
>> > Raghu.
>> >
>> >>
>> >> That way first everything is copied (possibly via multiple
>> attempts),
>> >> and then old files are deleted (possibly via multiple attempts).
>> >>
>> >> On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri 
>> wrote:
>> >>>
>> >>> I agree that overwriting is more in line with user expectations.
>> >>> I believe that the sink should not ignore errors from the
>> filesystem
>> >>> layer. Instead, the FileSystem API should be more well defined.
>> >>> Examples: rename() and copy() should overwrite existing files at
>> the
>> >>> destination, copy() should have an ignore_missing flag.
>> >>>
>> >>> On Wed, Jan 31, 

Re: Filesystems.copy and .rename behavior

2018-02-02 Thread Raghu Angadi
In a batch pipeline, is it considered a data loss if the the stage fails
(assuming it does not set IGNORE_MISSING_FILES and fails hard)? If not, it
might be better to favor correctness and fail in current implementation.

On Thu, Feb 1, 2018 at 4:07 PM, Robert Bradshaw  wrote:

> You could add a step to delete all of dest before a barrier and the
> step that does the rename as outlined. In that case, any dest file
> that exists must be good.
>
> On Thu, Feb 1, 2018 at 2:52 PM, Eugene Kirpichov 
> wrote:
> > I think this is still unsafe in case exists(dst) (e.g. this is a re-run
> of a
> > pipeline) but src is missing due to some bad reason. However it's
> probably
> > better than what we have (e.g. we currently certainly don't perform
> checksum
> > checks).
> >
> > On Thu, Feb 1, 2018 at 2:45 PM Udi Meiri  wrote:
> >>
> >> For GCS, I would do what I believe we already do.
> >> rename(src, dst):
> >> - if !exists(src) and exists(dst) return 0
> >> - if !exists(src) and !exists(dst) return error
> >> - if exists(src) and exists(dst) { if checksum(src) == checksum(dst)
> >> return 0 else delete(dst) }
> >> - Start a GCS copy from src to dst.
> >> - Wait for GCS copy to complete.
> >> - delete(src)
> >>
> >> For filesystems that don't have checksum() metadata, size() can be used
> >> instead.
> >>
> >> I've opened a bug to track this:
> >> https://issues.apache.org/jira/browse/BEAM-3600
> >>
> >> On Thu, Feb 1, 2018 at 2:25 PM Eugene Kirpichov 
> >> wrote:
> >>>
> >>> Yes, IGNORE_MISSING_FILES is unsafe because it will - well - ignore
> files
> >>> that are missing for more ominous reasons than just this being a
> non-first
> >>> attempt at renaming src to dst. E.g. if there was a bug in
> constructing the
> >>> filename to be renamed, or if we somehow messed up the order of rename
> vs
> >>> cleanup, etc. - these situations with IGNORE_MISSING_FILES would lead
> to
> >>> silent data loss (likely caught by unit tests though - so this is not a
> >>> super serious issue).
> >>>
> >>> Basically I just can't think of a case when I was copying files and
> >>> thinking "oh man, I wish it didn't give an error if the stuff I'm
> copying
> >>> doesn't exist" - the option exists only because we couldn't come up
> with
> >>> another way to implement idempotent rename on GCS.
> >>>
> >>> What's your idea of how a safe retryable GCS rename() could work?
> >>>
> >>> On Wed, Jan 31, 2018 at 6:45 PM Udi Meiri  wrote:
> 
>  Eugene, if I get this right, you're saying that IGNORE_MISSING_FILES
> is
>  unsafe because it will skip (src, dst) pairs where neither exist? (it
> only
>  looks if src exists)
> 
>  For GCS, we can construct a safe retryable rename() operation,
> assuming
>  that copy() and delete() are atomic for a single file or pair of
> files.
> 
> 
> 
>  On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi 
> wrote:
> >
> > On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov
> >  wrote:
> >>
> >> As far as I know, the current implementation of file sinks is the
> only
> >> reason why the flag IGNORE_MISSING for copying even exists -
> there's no
> >> other compelling reason to justify it. We implement "rename" as
> "copy, then
> >> delete" (in a single DoFn), so for idempodency of this operation we
> need to
> >> ignore the copying of a non-existent file.
> >>
> >> I think the right way to go would be to change the implementation of
> >> renaming to have a @RequiresStableInput (or reshuffle) in the
> middle, so
> >> it's made of 2 individually idempotent operations:
> >> 1) copy, which would fail if input is missing, and would overwrite
> >> output if it exists
> >> -- reshuffle --
> >> 2) delete, which would not fail if input is missing.
> >
> >
> > Something like this is needed only in streaming, right?
> >
> > Raghu.
> >
> >>
> >> That way first everything is copied (possibly via multiple
> attempts),
> >> and then old files are deleted (possibly via multiple attempts).
> >>
> >> On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri  wrote:
> >>>
> >>> I agree that overwriting is more in line with user expectations.
> >>> I believe that the sink should not ignore errors from the
> filesystem
> >>> layer. Instead, the FileSystem API should be more well defined.
> >>> Examples: rename() and copy() should overwrite existing files at
> the
> >>> destination, copy() should have an ignore_missing flag.
> >>>
> >>> On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi 
> >>> wrote:
> 
>  Original mail mentions that output from second run of word_count
> is
>  ignored. That does not seem as safe as ignoring error from a
> second attempt
>  of a step. 

Re: Filesystems.copy and .rename behavior

2018-02-01 Thread Robert Bradshaw
You could add a step to delete all of dest before a barrier and the
step that does the rename as outlined. In that case, any dest file
that exists must be good.

On Thu, Feb 1, 2018 at 2:52 PM, Eugene Kirpichov  wrote:
> I think this is still unsafe in case exists(dst) (e.g. this is a re-run of a
> pipeline) but src is missing due to some bad reason. However it's probably
> better than what we have (e.g. we currently certainly don't perform checksum
> checks).
>
> On Thu, Feb 1, 2018 at 2:45 PM Udi Meiri  wrote:
>>
>> For GCS, I would do what I believe we already do.
>> rename(src, dst):
>> - if !exists(src) and exists(dst) return 0
>> - if !exists(src) and !exists(dst) return error
>> - if exists(src) and exists(dst) { if checksum(src) == checksum(dst)
>> return 0 else delete(dst) }
>> - Start a GCS copy from src to dst.
>> - Wait for GCS copy to complete.
>> - delete(src)
>>
>> For filesystems that don't have checksum() metadata, size() can be used
>> instead.
>>
>> I've opened a bug to track this:
>> https://issues.apache.org/jira/browse/BEAM-3600
>>
>> On Thu, Feb 1, 2018 at 2:25 PM Eugene Kirpichov 
>> wrote:
>>>
>>> Yes, IGNORE_MISSING_FILES is unsafe because it will - well - ignore files
>>> that are missing for more ominous reasons than just this being a non-first
>>> attempt at renaming src to dst. E.g. if there was a bug in constructing the
>>> filename to be renamed, or if we somehow messed up the order of rename vs
>>> cleanup, etc. - these situations with IGNORE_MISSING_FILES would lead to
>>> silent data loss (likely caught by unit tests though - so this is not a
>>> super serious issue).
>>>
>>> Basically I just can't think of a case when I was copying files and
>>> thinking "oh man, I wish it didn't give an error if the stuff I'm copying
>>> doesn't exist" - the option exists only because we couldn't come up with
>>> another way to implement idempotent rename on GCS.
>>>
>>> What's your idea of how a safe retryable GCS rename() could work?
>>>
>>> On Wed, Jan 31, 2018 at 6:45 PM Udi Meiri  wrote:

 Eugene, if I get this right, you're saying that IGNORE_MISSING_FILES is
 unsafe because it will skip (src, dst) pairs where neither exist? (it only
 looks if src exists)

 For GCS, we can construct a safe retryable rename() operation, assuming
 that copy() and delete() are atomic for a single file or pair of files.



 On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi  wrote:
>
> On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov
>  wrote:
>>
>> As far as I know, the current implementation of file sinks is the only
>> reason why the flag IGNORE_MISSING for copying even exists - there's no
>> other compelling reason to justify it. We implement "rename" as "copy, 
>> then
>> delete" (in a single DoFn), so for idempodency of this operation we need 
>> to
>> ignore the copying of a non-existent file.
>>
>> I think the right way to go would be to change the implementation of
>> renaming to have a @RequiresStableInput (or reshuffle) in the middle, so
>> it's made of 2 individually idempotent operations:
>> 1) copy, which would fail if input is missing, and would overwrite
>> output if it exists
>> -- reshuffle --
>> 2) delete, which would not fail if input is missing.
>
>
> Something like this is needed only in streaming, right?
>
> Raghu.
>
>>
>> That way first everything is copied (possibly via multiple attempts),
>> and then old files are deleted (possibly via multiple attempts).
>>
>> On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri  wrote:
>>>
>>> I agree that overwriting is more in line with user expectations.
>>> I believe that the sink should not ignore errors from the filesystem
>>> layer. Instead, the FileSystem API should be more well defined.
>>> Examples: rename() and copy() should overwrite existing files at the
>>> destination, copy() should have an ignore_missing flag.
>>>
>>> On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi 
>>> wrote:

 Original mail mentions that output from second run of word_count is
 ignored. That does not seem as safe as ignoring error from a second 
 attempt
 of a step. How do we know second run didn't run on different output?
 Overwriting seems more accurate than ignoring. Does handling this 
 error at
 sink level distinguish between the two (another run vs second attempt)?


 On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri 
 wrote:
>
> Yeah, another round of refactoring is due to move the rename via
> copy+delete logic up to the file-based sink level.
>
>
> On Wed, 

Re: Filesystems.copy and .rename behavior

2018-02-01 Thread Eugene Kirpichov
I think this is still unsafe in case exists(dst) (e.g. this is a re-run of
a pipeline) but src is missing due to some bad reason. However it's
probably better than what we have (e.g. we currently certainly don't
perform checksum checks).

On Thu, Feb 1, 2018 at 2:45 PM Udi Meiri  wrote:

> For GCS, I would do what I believe we already do.
> rename(src, dst):
> - if !exists(src) and exists(dst) return 0
> - if !exists(src) and !exists(dst) return error
> - if exists(src) and exists(dst) { if checksum(src) == checksum(dst)
> return 0 else delete(dst) }
> - Start a GCS copy from src to dst.
> - Wait for GCS copy to complete.
> - delete(src)
>
> For filesystems that don't have checksum() metadata, size() can be used
> instead.
>
> I've opened a bug to track this:
> https://issues.apache.org/jira/browse/BEAM-3600
>
> On Thu, Feb 1, 2018 at 2:25 PM Eugene Kirpichov 
> wrote:
>
>> Yes, IGNORE_MISSING_FILES is unsafe because it will - well - ignore files
>> that are missing for more ominous reasons than just this being a non-first
>> attempt at renaming src to dst. E.g. if there was a bug in constructing the
>> filename to be renamed, or if we somehow messed up the order of rename vs
>> cleanup, etc. - these situations with IGNORE_MISSING_FILES would lead to
>> silent data loss (likely caught by unit tests though - so this is not a
>> super serious issue).
>>
>> Basically I just can't think of a case when I was copying files and
>> thinking "oh man, I wish it didn't give an error if the stuff I'm copying
>> doesn't exist" - the option exists only because we couldn't come up with
>> another way to implement idempotent rename on GCS.
>>
>> What's your idea of how a safe retryable GCS rename() could work?
>>
>> On Wed, Jan 31, 2018 at 6:45 PM Udi Meiri  wrote:
>>
>>> Eugene, if I get this right, you're saying that IGNORE_MISSING_FILES is
>>> unsafe because it will skip (src, dst) pairs where neither exist? (it only
>>> looks if src exists)
>>>
>>> For GCS, we can construct a safe retryable rename() operation, assuming
>>> that copy() and delete() are atomic for a single file or pair of files.
>>>
>>>
>>>
>>> On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi  wrote:
>>>
 On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov  wrote:

> As far as I know, the current implementation of file sinks is the only
> reason why the flag IGNORE_MISSING for copying even exists - there's no
> other compelling reason to justify it. We implement "rename" as "copy, 
> then
> delete" (in a single DoFn), so for idempodency of this operation we need 
> to
> ignore the copying of a non-existent file.
>
> I think the right way to go would be to change the implementation of
> renaming to have a @RequiresStableInput (or reshuffle) in the middle, so
> it's made of 2 individually idempotent operations:
> 1) copy, which would fail if input is missing, and would overwrite
> output if it exists
> -- reshuffle --
> 2) delete, which would not fail if input is missing.
>

 Something like this is needed only in streaming, right?

 Raghu.


> That way first everything is copied (possibly via multiple attempts),
> and then old files are deleted (possibly via multiple attempts).
>
> On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri  wrote:
>
>> I agree that overwriting is more in line with user expectations.
>> I believe that the sink should not ignore errors from the filesystem
>> layer. Instead, the FileSystem API should be more well defined.
>> Examples: rename() and copy() should overwrite existing files at the
>> destination, copy() should have an ignore_missing flag.
>>
>> On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi 
>> wrote:
>>
>>> Original mail mentions that output from second run of word_count is
>>> ignored. That does not seem as safe as ignoring error from a second 
>>> attempt
>>> of a step. How do we know second run didn't run on different output?
>>> Overwriting seems more accurate than ignoring. Does handling this error 
>>> at
>>> sink level distinguish between the two (another run vs second attempt)?
>>>
>>>
>>> On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri 
>>> wrote:
>>>
 Yeah, another round of refactoring is due to move the rename via
 copy+delete logic up to the file-based sink level.

 On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath <
 chamik...@google.com> wrote:

> Good point. There's always the chance of step that performs final
> rename being retried. So we'll have to ignore this error at the sink 
> level.
> We don't necessarily have to do this at the FileSystem level though. I
> think the proper 

Re: Filesystems.copy and .rename behavior

2018-02-01 Thread Udi Meiri
For GCS, I would do what I believe we already do.
rename(src, dst):
- if !exists(src) and exists(dst) return 0
- if !exists(src) and !exists(dst) return error
- if exists(src) and exists(dst) { if checksum(src) == checksum(dst) return
0 else delete(dst) }
- Start a GCS copy from src to dst.
- Wait for GCS copy to complete.
- delete(src)

For filesystems that don't have checksum() metadata, size() can be used
instead.

I've opened a bug to track this:
https://issues.apache.org/jira/browse/BEAM-3600

On Thu, Feb 1, 2018 at 2:25 PM Eugene Kirpichov 
wrote:

> Yes, IGNORE_MISSING_FILES is unsafe because it will - well - ignore files
> that are missing for more ominous reasons than just this being a non-first
> attempt at renaming src to dst. E.g. if there was a bug in constructing the
> filename to be renamed, or if we somehow messed up the order of rename vs
> cleanup, etc. - these situations with IGNORE_MISSING_FILES would lead to
> silent data loss (likely caught by unit tests though - so this is not a
> super serious issue).
>
> Basically I just can't think of a case when I was copying files and
> thinking "oh man, I wish it didn't give an error if the stuff I'm copying
> doesn't exist" - the option exists only because we couldn't come up with
> another way to implement idempotent rename on GCS.
>
> What's your idea of how a safe retryable GCS rename() could work?
>
> On Wed, Jan 31, 2018 at 6:45 PM Udi Meiri  wrote:
>
>> Eugene, if I get this right, you're saying that IGNORE_MISSING_FILES is
>> unsafe because it will skip (src, dst) pairs where neither exist? (it only
>> looks if src exists)
>>
>> For GCS, we can construct a safe retryable rename() operation, assuming
>> that copy() and delete() are atomic for a single file or pair of files.
>>
>>
>>
>> On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi  wrote:
>>
>>> On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov 
>>> wrote:
>>>
 As far as I know, the current implementation of file sinks is the only
 reason why the flag IGNORE_MISSING for copying even exists - there's no
 other compelling reason to justify it. We implement "rename" as "copy, then
 delete" (in a single DoFn), so for idempodency of this operation we need to
 ignore the copying of a non-existent file.

 I think the right way to go would be to change the implementation of
 renaming to have a @RequiresStableInput (or reshuffle) in the middle, so
 it's made of 2 individually idempotent operations:
 1) copy, which would fail if input is missing, and would overwrite
 output if it exists
 -- reshuffle --
 2) delete, which would not fail if input is missing.

>>>
>>> Something like this is needed only in streaming, right?
>>>
>>> Raghu.
>>>
>>>
 That way first everything is copied (possibly via multiple attempts),
 and then old files are deleted (possibly via multiple attempts).

 On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri  wrote:

> I agree that overwriting is more in line with user expectations.
> I believe that the sink should not ignore errors from the filesystem
> layer. Instead, the FileSystem API should be more well defined.
> Examples: rename() and copy() should overwrite existing files at the
> destination, copy() should have an ignore_missing flag.
>
> On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi 
> wrote:
>
>> Original mail mentions that output from second run of word_count is
>> ignored. That does not seem as safe as ignoring error from a second 
>> attempt
>> of a step. How do we know second run didn't run on different output?
>> Overwriting seems more accurate than ignoring. Does handling this error 
>> at
>> sink level distinguish between the two (another run vs second attempt)?
>>
>>
>> On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri  wrote:
>>
>>> Yeah, another round of refactoring is due to move the rename via
>>> copy+delete logic up to the file-based sink level.
>>>
>>> On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath 
>>> wrote:
>>>
 Good point. There's always the chance of step that performs final
 rename being retried. So we'll have to ignore this error at the sink 
 level.
 We don't necessarily have to do this at the FileSystem level though. I
 think the proper behavior might be to raise an error for the rename at 
 the
 FileSystem level if the destination already exists (or source doesn't
 exist) while ignoring that error (and possibly logging a warning) at 
 the
 sink level.

 - Cham


 On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax 
 wrote:

> I think the idea was to ignore "already exists" 

Re: Filesystems.copy and .rename behavior

2018-02-01 Thread Eugene Kirpichov
Yes, IGNORE_MISSING_FILES is unsafe because it will - well - ignore files
that are missing for more ominous reasons than just this being a non-first
attempt at renaming src to dst. E.g. if there was a bug in constructing the
filename to be renamed, or if we somehow messed up the order of rename vs
cleanup, etc. - these situations with IGNORE_MISSING_FILES would lead to
silent data loss (likely caught by unit tests though - so this is not a
super serious issue).

Basically I just can't think of a case when I was copying files and
thinking "oh man, I wish it didn't give an error if the stuff I'm copying
doesn't exist" - the option exists only because we couldn't come up with
another way to implement idempotent rename on GCS.

What's your idea of how a safe retryable GCS rename() could work?

On Wed, Jan 31, 2018 at 6:45 PM Udi Meiri  wrote:

> Eugene, if I get this right, you're saying that IGNORE_MISSING_FILES is
> unsafe because it will skip (src, dst) pairs where neither exist? (it only
> looks if src exists)
>
> For GCS, we can construct a safe retryable rename() operation, assuming
> that copy() and delete() are atomic for a single file or pair of files.
>
>
>
> On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi  wrote:
>
>> On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov 
>> wrote:
>>
>>> As far as I know, the current implementation of file sinks is the only
>>> reason why the flag IGNORE_MISSING for copying even exists - there's no
>>> other compelling reason to justify it. We implement "rename" as "copy, then
>>> delete" (in a single DoFn), so for idempodency of this operation we need to
>>> ignore the copying of a non-existent file.
>>>
>>> I think the right way to go would be to change the implementation of
>>> renaming to have a @RequiresStableInput (or reshuffle) in the middle, so
>>> it's made of 2 individually idempotent operations:
>>> 1) copy, which would fail if input is missing, and would overwrite
>>> output if it exists
>>> -- reshuffle --
>>> 2) delete, which would not fail if input is missing.
>>>
>>
>> Something like this is needed only in streaming, right?
>>
>> Raghu.
>>
>>
>>> That way first everything is copied (possibly via multiple attempts),
>>> and then old files are deleted (possibly via multiple attempts).
>>>
>>> On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri  wrote:
>>>
 I agree that overwriting is more in line with user expectations.
 I believe that the sink should not ignore errors from the filesystem
 layer. Instead, the FileSystem API should be more well defined.
 Examples: rename() and copy() should overwrite existing files at the
 destination, copy() should have an ignore_missing flag.

 On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi 
 wrote:

> Original mail mentions that output from second run of word_count is
> ignored. That does not seem as safe as ignoring error from a second 
> attempt
> of a step. How do we know second run didn't run on different output?
> Overwriting seems more accurate than ignoring. Does handling this error at
> sink level distinguish between the two (another run vs second attempt)?
>
>
> On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri  wrote:
>
>> Yeah, another round of refactoring is due to move the rename via
>> copy+delete logic up to the file-based sink level.
>>
>> On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath 
>> wrote:
>>
>>> Good point. There's always the chance of step that performs final
>>> rename being retried. So we'll have to ignore this error at the sink 
>>> level.
>>> We don't necessarily have to do this at the FileSystem level though. I
>>> think the proper behavior might be to raise an error for the rename at 
>>> the
>>> FileSystem level if the destination already exists (or source doesn't
>>> exist) while ignoring that error (and possibly logging a warning) at the
>>> sink level.
>>>
>>> - Cham
>>>
>>>
>>> On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax  wrote:
>>>
 I think the idea was to ignore "already exists" errors. The reason
 being that any step in Beam can be executed multiple times, including 
 the
 rename step. If the rename step gets run twice, the second run should
 succeed vacuously.


 On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri 
 wrote:

> Hi,
> I've been working on HDFS code for the Python SDK and I've noticed
> some behaviors which are surprising. I wanted to know if these 
> behaviors
> are known and intended.
>
> 1. When renaming files during finalize_write, rename errors are
> ignored
> 

Re: Filesystems.copy and .rename behavior

2018-01-31 Thread Udi Meiri
Eugene, if I get this right, you're saying that IGNORE_MISSING_FILES is
unsafe because it will skip (src, dst) pairs where neither exist? (it only
looks if src exists)

For GCS, we can construct a safe retryable rename() operation, assuming
that copy() and delete() are atomic for a single file or pair of files.



On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi  wrote:

> On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov 
> wrote:
>
>> As far as I know, the current implementation of file sinks is the only
>> reason why the flag IGNORE_MISSING for copying even exists - there's no
>> other compelling reason to justify it. We implement "rename" as "copy, then
>> delete" (in a single DoFn), so for idempodency of this operation we need to
>> ignore the copying of a non-existent file.
>>
>> I think the right way to go would be to change the implementation of
>> renaming to have a @RequiresStableInput (or reshuffle) in the middle, so
>> it's made of 2 individually idempotent operations:
>> 1) copy, which would fail if input is missing, and would overwrite output
>> if it exists
>> -- reshuffle --
>> 2) delete, which would not fail if input is missing.
>>
>
> Something like this is needed only in streaming, right?
>
> Raghu.
>
>
>> That way first everything is copied (possibly via multiple attempts), and
>> then old files are deleted (possibly via multiple attempts).
>>
>> On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri  wrote:
>>
>>> I agree that overwriting is more in line with user expectations.
>>> I believe that the sink should not ignore errors from the filesystem
>>> layer. Instead, the FileSystem API should be more well defined.
>>> Examples: rename() and copy() should overwrite existing files at the
>>> destination, copy() should have an ignore_missing flag.
>>>
>>> On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi  wrote:
>>>
 Original mail mentions that output from second run of word_count is
 ignored. That does not seem as safe as ignoring error from a second attempt
 of a step. How do we know second run didn't run on different output?
 Overwriting seems more accurate than ignoring. Does handling this error at
 sink level distinguish between the two (another run vs second attempt)?


 On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri  wrote:

> Yeah, another round of refactoring is due to move the rename via
> copy+delete logic up to the file-based sink level.
>
> On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath 
> wrote:
>
>> Good point. There's always the chance of step that performs final
>> rename being retried. So we'll have to ignore this error at the sink 
>> level.
>> We don't necessarily have to do this at the FileSystem level though. I
>> think the proper behavior might be to raise an error for the rename at 
>> the
>> FileSystem level if the destination already exists (or source doesn't
>> exist) while ignoring that error (and possibly logging a warning) at the
>> sink level.
>>
>> - Cham
>>
>>
>> On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax  wrote:
>>
>>> I think the idea was to ignore "already exists" errors. The reason
>>> being that any step in Beam can be executed multiple times, including 
>>> the
>>> rename step. If the rename step gets run twice, the second run should
>>> succeed vacuously.
>>>
>>>
>>> On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri  wrote:
>>>
 Hi,
 I've been working on HDFS code for the Python SDK and I've noticed
 some behaviors which are surprising. I wanted to know if these 
 behaviors
 are known and intended.

 1. When renaming files during finalize_write, rename errors are
 ignored
 .
 For example, if I run wordcount twice using HDFS code I get a warning 
 the
 second time because the file already exists:

 WARNING:root:Rename not successful:
 hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
 -> hdfs://counts2-0-of-1, libhdfs error in renaming
 hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
 to hdfs://counts2-0-of-1 with exceptions Unable to rename
 '/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2'
 to '/counts2-0-of-1'.

 For GCS and local files there are no rename errors (in this case),
 since the rename operation silently overwrites existing destination 
 files.
 However, 

Re: Filesystems.copy and .rename behavior

2018-01-31 Thread Raghu Angadi
On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov 
wrote:

> As far as I know, the current implementation of file sinks is the only
> reason why the flag IGNORE_MISSING for copying even exists - there's no
> other compelling reason to justify it. We implement "rename" as "copy, then
> delete" (in a single DoFn), so for idempodency of this operation we need to
> ignore the copying of a non-existent file.
>
> I think the right way to go would be to change the implementation of
> renaming to have a @RequiresStableInput (or reshuffle) in the middle, so
> it's made of 2 individually idempotent operations:
> 1) copy, which would fail if input is missing, and would overwrite output
> if it exists
> -- reshuffle --
> 2) delete, which would not fail if input is missing.
>

Something like this is needed only in streaming, right?

Raghu.


> That way first everything is copied (possibly via multiple attempts), and
> then old files are deleted (possibly via multiple attempts).
>
> On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri  wrote:
>
>> I agree that overwriting is more in line with user expectations.
>> I believe that the sink should not ignore errors from the filesystem
>> layer. Instead, the FileSystem API should be more well defined.
>> Examples: rename() and copy() should overwrite existing files at the
>> destination, copy() should have an ignore_missing flag.
>>
>> On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi  wrote:
>>
>>> Original mail mentions that output from second run of word_count is
>>> ignored. That does not seem as safe as ignoring error from a second attempt
>>> of a step. How do we know second run didn't run on different output?
>>> Overwriting seems more accurate than ignoring. Does handling this error at
>>> sink level distinguish between the two (another run vs second attempt)?
>>>
>>>
>>> On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri  wrote:
>>>
 Yeah, another round of refactoring is due to move the rename via
 copy+delete logic up to the file-based sink level.

 On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath 
 wrote:

> Good point. There's always the chance of step that performs final
> rename being retried. So we'll have to ignore this error at the sink 
> level.
> We don't necessarily have to do this at the FileSystem level though. I
> think the proper behavior might be to raise an error for the rename at the
> FileSystem level if the destination already exists (or source doesn't
> exist) while ignoring that error (and possibly logging a warning) at the
> sink level.
>
> - Cham
>
>
> On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax  wrote:
>
>> I think the idea was to ignore "already exists" errors. The reason
>> being that any step in Beam can be executed multiple times, including the
>> rename step. If the rename step gets run twice, the second run should
>> succeed vacuously.
>>
>>
>> On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri  wrote:
>>
>>> Hi,
>>> I've been working on HDFS code for the Python SDK and I've noticed
>>> some behaviors which are surprising. I wanted to know if these behaviors
>>> are known and intended.
>>>
>>> 1. When renaming files during finalize_write, rename errors are
>>> ignored
>>> .
>>> For example, if I run wordcount twice using HDFS code I get a warning 
>>> the
>>> second time because the file already exists:
>>>
>>> WARNING:root:Rename not successful: hdfs://beam-temp-counts2-
>>> 7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
>>> -> hdfs://counts2-0-of-1, libhdfs error in renaming
>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da2
>>> 45/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2 to
>>> hdfs://counts2-0-of-1 with exceptions Unable to rename
>>> '/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da2
>>> 45/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2' to
>>> '/counts2-0-of-1'.
>>>
>>> For GCS and local files there are no rename errors (in this case),
>>> since the rename operation silently overwrites existing destination 
>>> files.
>>> However, blindly ignoring these errors might make the pipeline to report
>>> success even though output files are missing.
>>>
>>> 2. Output files (--ouput) overwrite existing files.
>>>
>>> 3. The Python SDK doesn't use Filesystems.copy(). The Java SDK
>>> doesn't use Filesystem.Rename().
>>>
>>> Thanks,
>>> - Udi
>>>
>>
>>
>>>


Re: Filesystems.copy and .rename behavior

2018-01-31 Thread Eugene Kirpichov
I agree that using an atomic rename operation is even better. I'm mainly
opposed to having a copy option that ignores missing files, and to our
implementation of rename using that option, because it's unsafe.
Unfortunately GCS doesn't have an atomic rename, so I'm not sure what's the
best way to go for GCS without introducing the unsafe operations.

On Wed, Jan 31, 2018, 3:39 PM Chamikara Jayalath 
wrote:

> Agree with what Robert said. We have a rename() operation in the
> FileSystem abstraction and some file-systems might be able to implement
> this more efficiently than copy+delete. Also note that the same issue could
> arise in any other usage of rename operation. So I agree that a
> retry-tolerant version of rename will be useful. Note that we can do this
> without making all FileSystem.rename() implementations unsafe. For example,
> in Java, IGNORE_MISSING_FILES options is implemented by filtering out
> non-existing files in FileSystems.rename() before invoking
> FileSystem.rename().
>
>
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L316
>
> - Cham
>
>
> On Wed, Jan 31, 2018 at 3:14 PM Robert Bradshaw 
> wrote:
>
>> For very large filesets, it may be too much to assume that the copy
>> succeed in its entirety on the first try. (I suppose we could chunk
>> copies into individual retryable bundles, but this may not respect the
>> filesystem's default chunking/bulk operations.) The other downside of
>> copying entirely before any deletion is that unless the filesystem is
>> smart about copies, it may double the required intermediate storage
>> size (v.s. deleting once a particular shard has been copied). Also,
>> some filesystems may support rename (even bulk rename) that's cheaper
>> than copy + delete. For these reasons I think a (optionally
>> retry-tolerant) bulk rename makes sense as an operation on the
>> filesystem API rather than implemented as a composite operation built
>> on lower-level filesystem primitives.
>>
>> On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov 
>> wrote:
>> > As far as I know, the current implementation of file sinks is the only
>> > reason why the flag IGNORE_MISSING for copying even exists - there's no
>> > other compelling reason to justify it. We implement "rename" as "copy,
>> then
>> > delete" (in a single DoFn), so for idempodency of this operation we
>> need to
>> > ignore the copying of a non-existent file.
>> >
>> > I think the right way to go would be to change the implementation of
>> > renaming to have a @RequiresStableInput (or reshuffle) in the middle, so
>> > it's made of 2 individually idempotent operations:
>> > 1) copy, which would fail if input is missing, and would overwrite
>> output if
>> > it exists
>> > -- reshuffle --
>> > 2) delete, which would not fail if input is missing.
>> >
>> > That way first everything is copied (possibly via multiple attempts),
>> and
>> > then old files are deleted (possibly via multiple attempts).
>> >
>> > On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri  wrote:
>> >>
>> >> I agree that overwriting is more in line with user expectations.
>> >> I believe that the sink should not ignore errors from the filesystem
>> >> layer. Instead, the FileSystem API should be more well defined.
>> >> Examples: rename() and copy() should overwrite existing files at the
>> >> destination, copy() should have an ignore_missing flag.
>> >>
>> >> On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi 
>> wrote:
>> >>>
>> >>> Original mail mentions that output from second run of word_count is
>> >>> ignored. That does not seem as safe as ignoring error from a second
>> attempt
>> >>> of a step. How do we know second run didn't run on different output?
>> >>> Overwriting seems more accurate than ignoring. Does handling this
>> error at
>> >>> sink level distinguish between the two (another run vs second
>> attempt)?
>> >>>
>> >>>
>> >>> On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri  wrote:
>> 
>>  Yeah, another round of refactoring is due to move the rename via
>>  copy+delete logic up to the file-based sink level.
>> 
>> 
>>  On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath > >
>>  wrote:
>> >
>> > Good point. There's always the chance of step that performs final
>> > rename being retried. So we'll have to ignore this error at the
>> sink level.
>> > We don't necessarily have to do this at the FileSystem level
>> though. I think
>> > the proper behavior might be to raise an error for the rename at the
>> > FileSystem level if the destination already exists (or source
>> doesn't exist)
>> > while ignoring that error (and possibly logging a warning) at the
>> sink
>> > level.
>> >
>> > - Cham
>> >
>> >
>> > On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax 
>> 

Re: Filesystems.copy and .rename behavior

2018-01-31 Thread Chamikara Jayalath
Agree with what Robert said. We have a rename() operation in the FileSystem
abstraction and some file-systems might be able to implement this more
efficiently than copy+delete. Also note that the same issue could arise in
any other usage of rename operation. So I agree that a retry-tolerant
version of rename will be useful. Note that we can do this without making
all FileSystem.rename() implementations unsafe. For example, in Java,
IGNORE_MISSING_FILES options is implemented by filtering out non-existing
files in FileSystems.rename() before invoking FileSystem.rename().

https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L316

- Cham

On Wed, Jan 31, 2018 at 3:14 PM Robert Bradshaw  wrote:

> For very large filesets, it may be too much to assume that the copy
> succeed in its entirety on the first try. (I suppose we could chunk
> copies into individual retryable bundles, but this may not respect the
> filesystem's default chunking/bulk operations.) The other downside of
> copying entirely before any deletion is that unless the filesystem is
> smart about copies, it may double the required intermediate storage
> size (v.s. deleting once a particular shard has been copied). Also,
> some filesystems may support rename (even bulk rename) that's cheaper
> than copy + delete. For these reasons I think a (optionally
> retry-tolerant) bulk rename makes sense as an operation on the
> filesystem API rather than implemented as a composite operation built
> on lower-level filesystem primitives.
>
> On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov 
> wrote:
> > As far as I know, the current implementation of file sinks is the only
> > reason why the flag IGNORE_MISSING for copying even exists - there's no
> > other compelling reason to justify it. We implement "rename" as "copy,
> then
> > delete" (in a single DoFn), so for idempodency of this operation we need
> to
> > ignore the copying of a non-existent file.
> >
> > I think the right way to go would be to change the implementation of
> > renaming to have a @RequiresStableInput (or reshuffle) in the middle, so
> > it's made of 2 individually idempotent operations:
> > 1) copy, which would fail if input is missing, and would overwrite
> output if
> > it exists
> > -- reshuffle --
> > 2) delete, which would not fail if input is missing.
> >
> > That way first everything is copied (possibly via multiple attempts), and
> > then old files are deleted (possibly via multiple attempts).
> >
> > On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri  wrote:
> >>
> >> I agree that overwriting is more in line with user expectations.
> >> I believe that the sink should not ignore errors from the filesystem
> >> layer. Instead, the FileSystem API should be more well defined.
> >> Examples: rename() and copy() should overwrite existing files at the
> >> destination, copy() should have an ignore_missing flag.
> >>
> >> On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi 
> wrote:
> >>>
> >>> Original mail mentions that output from second run of word_count is
> >>> ignored. That does not seem as safe as ignoring error from a second
> attempt
> >>> of a step. How do we know second run didn't run on different output?
> >>> Overwriting seems more accurate than ignoring. Does handling this
> error at
> >>> sink level distinguish between the two (another run vs second attempt)?
> >>>
> >>>
> >>> On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri  wrote:
> 
>  Yeah, another round of refactoring is due to move the rename via
>  copy+delete logic up to the file-based sink level.
> 
> 
>  On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath 
>  wrote:
> >
> > Good point. There's always the chance of step that performs final
> > rename being retried. So we'll have to ignore this error at the sink
> level.
> > We don't necessarily have to do this at the FileSystem level though.
> I think
> > the proper behavior might be to raise an error for the rename at the
> > FileSystem level if the destination already exists (or source
> doesn't exist)
> > while ignoring that error (and possibly logging a warning) at the
> sink
> > level.
> >
> > - Cham
> >
> >
> > On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax  wrote:
> >>
> >> I think the idea was to ignore "already exists" errors. The reason
> >> being that any step in Beam can be executed multiple times,
> including the
> >> rename step. If the rename step gets run twice, the second run
> should
> >> succeed vacuously.
> >>
> >>
> >> On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri 
> wrote:
> >>>
> >>> Hi,
> >>> I've been working on HDFS code for the Python SDK and I've noticed
> >>> some behaviors which are surprising. I wanted to know if these
> 

Re: Filesystems.copy and .rename behavior

2018-01-31 Thread Udi Meiri
I agree that overwriting is more in line with user expectations.
I believe that the sink should not ignore errors from the filesystem layer.
Instead, the FileSystem API should be more well defined.
Examples: rename() and copy() should overwrite existing files at the
destination, copy() should have an ignore_missing flag.

On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi  wrote:

> Original mail mentions that output from second run of word_count is
> ignored. That does not seem as safe as ignoring error from a second attempt
> of a step. How do we know second run didn't run on different output?
> Overwriting seems more accurate than ignoring. Does handling this error at
> sink level distinguish between the two (another run vs second attempt)?
>
>
> On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri  wrote:
>
>> Yeah, another round of refactoring is due to move the rename via
>> copy+delete logic up to the file-based sink level.
>>
>> On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath 
>> wrote:
>>
>>> Good point. There's always the chance of step that performs final rename
>>> being retried. So we'll have to ignore this error at the sink level. We
>>> don't necessarily have to do this at the FileSystem level though. I think
>>> the proper behavior might be to raise an error for the rename at the
>>> FileSystem level if the destination already exists (or source doesn't
>>> exist) while ignoring that error (and possibly logging a warning) at the
>>> sink level.
>>>
>>> - Cham
>>>
>>>
>>> On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax  wrote:
>>>
 I think the idea was to ignore "already exists" errors. The reason
 being that any step in Beam can be executed multiple times, including the
 rename step. If the rename step gets run twice, the second run should
 succeed vacuously.


 On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri  wrote:

> Hi,
> I've been working on HDFS code for the Python SDK and I've noticed
> some behaviors which are surprising. I wanted to know if these behaviors
> are known and intended.
>
> 1. When renaming files during finalize_write, rename errors are
> ignored
> .
> For example, if I run wordcount twice using HDFS code I get a warning the
> second time because the file already exists:
>
> WARNING:root:Rename not successful:
> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
> -> hdfs://counts2-0-of-1, libhdfs error in renaming
> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
> to hdfs://counts2-0-of-1 with exceptions Unable to rename
> '/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2'
> to '/counts2-0-of-1'.
>
> For GCS and local files there are no rename errors (in this case),
> since the rename operation silently overwrites existing destination files.
> However, blindly ignoring these errors might make the pipeline to report
> success even though output files are missing.
>
> 2. Output files (--ouput) overwrite existing files.
>
> 3. The Python SDK doesn't use Filesystems.copy(). The Java SDK doesn't
> use Filesystem.Rename().
>
> Thanks,
> - Udi
>


>


smime.p7s
Description: S/MIME Cryptographic Signature


Re: Filesystems.copy and .rename behavior

2018-01-31 Thread Raghu Angadi
Original mail mentions that output from second run of word_count is
ignored. That does not seem as safe as ignoring error from a second attempt
of a step. How do we know second run didn't run on different output?
Overwriting seems more accurate than ignoring. Does handling this error at
sink level distinguish between the two (another run vs second attempt)?


On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri  wrote:

> Yeah, another round of refactoring is due to move the rename via
> copy+delete logic up to the file-based sink level.
>
> On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath 
> wrote:
>
>> Good point. There's always the chance of step that performs final rename
>> being retried. So we'll have to ignore this error at the sink level. We
>> don't necessarily have to do this at the FileSystem level though. I think
>> the proper behavior might be to raise an error for the rename at the
>> FileSystem level if the destination already exists (or source doesn't
>> exist) while ignoring that error (and possibly logging a warning) at the
>> sink level.
>>
>> - Cham
>>
>>
>> On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax  wrote:
>>
>>> I think the idea was to ignore "already exists" errors. The reason being
>>> that any step in Beam can be executed multiple times, including the rename
>>> step. If the rename step gets run twice, the second run should succeed
>>> vacuously.
>>>
>>>
>>> On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri  wrote:
>>>
 Hi,
 I've been working on HDFS code for the Python SDK and I've noticed some
 behaviors which are surprising. I wanted to know if these behaviors are
 known and intended.

 1. When renaming files during finalize_write, rename errors are ignored
 .
 For example, if I run wordcount twice using HDFS code I get a warning the
 second time because the file already exists:

 WARNING:root:Rename not successful: hdfs://beam-temp-counts2-
 7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
 -> hdfs://counts2-0-of-1, libhdfs error in renaming
 hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da2
 45/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2 to
 hdfs://counts2-0-of-1 with exceptions Unable to rename
 '/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da2
 45/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2' to
 '/counts2-0-of-1'.

 For GCS and local files there are no rename errors (in this case),
 since the rename operation silently overwrites existing destination files.
 However, blindly ignoring these errors might make the pipeline to report
 success even though output files are missing.

 2. Output files (--ouput) overwrite existing files.

 3. The Python SDK doesn't use Filesystems.copy(). The Java SDK doesn't
 use Filesystem.Rename().

 Thanks,
 - Udi

>>>
>>>


Re: Filesystems.copy and .rename behavior

2018-01-31 Thread Udi Meiri
Yeah, another round of refactoring is due to move the rename via
copy+delete logic up to the file-based sink level.

On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath  wrote:

> Good point. There's always the chance of step that performs final rename
> being retried. So we'll have to ignore this error at the sink level. We
> don't necessarily have to do this at the FileSystem level though. I think
> the proper behavior might be to raise an error for the rename at the
> FileSystem level if the destination already exists (or source doesn't
> exist) while ignoring that error (and possibly logging a warning) at the
> sink level.
>
> - Cham
>
>
> On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax  wrote:
>
>> I think the idea was to ignore "already exists" errors. The reason being
>> that any step in Beam can be executed multiple times, including the rename
>> step. If the rename step gets run twice, the second run should succeed
>> vacuously.
>>
>>
>> On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri  wrote:
>>
>>> Hi,
>>> I've been working on HDFS code for the Python SDK and I've noticed some
>>> behaviors which are surprising. I wanted to know if these behaviors are
>>> known and intended.
>>>
>>> 1. When renaming files during finalize_write, rename errors are ignored
>>> .
>>> For example, if I run wordcount twice using HDFS code I get a warning the
>>> second time because the file already exists:
>>>
>>> WARNING:root:Rename not successful:
>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
>>> -> hdfs://counts2-0-of-1, libhdfs error in renaming
>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
>>> to hdfs://counts2-0-of-1 with exceptions Unable to rename
>>> '/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2'
>>> to '/counts2-0-of-1'.
>>>
>>> For GCS and local files there are no rename errors (in this case), since
>>> the rename operation silently overwrites existing destination files.
>>> However, blindly ignoring these errors might make the pipeline to report
>>> success even though output files are missing.
>>>
>>> 2. Output files (--ouput) overwrite existing files.
>>>
>>> 3. The Python SDK doesn't use Filesystems.copy(). The Java SDK doesn't
>>> use Filesystem.Rename().
>>>
>>> Thanks,
>>> - Udi
>>>
>>
>>


smime.p7s
Description: S/MIME Cryptographic Signature


Re: Filesystems.copy and .rename behavior

2018-01-31 Thread Chamikara Jayalath
Good point. There's always the chance of step that performs final rename
being retried. So we'll have to ignore this error at the sink level. We
don't necessarily have to do this at the FileSystem level though. I think
the proper behavior might be to raise an error for the rename at the
FileSystem level if the destination already exists (or source doesn't
exist) while ignoring that error (and possibly logging a warning) at the
sink level.

- Cham

On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax  wrote:

> I think the idea was to ignore "already exists" errors. The reason being
> that any step in Beam can be executed multiple times, including the rename
> step. If the rename step gets run twice, the second run should succeed
> vacuously.
>
>
> On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri  wrote:
>
>> Hi,
>> I've been working on HDFS code for the Python SDK and I've noticed some
>> behaviors which are surprising. I wanted to know if these behaviors are
>> known and intended.
>>
>> 1. When renaming files during finalize_write, rename errors are ignored
>> .
>> For example, if I run wordcount twice using HDFS code I get a warning the
>> second time because the file already exists:
>>
>> WARNING:root:Rename not successful:
>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
>> -> hdfs://counts2-0-of-1, libhdfs error in renaming
>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
>> to hdfs://counts2-0-of-1 with exceptions Unable to rename
>> '/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2'
>> to '/counts2-0-of-1'.
>>
>> For GCS and local files there are no rename errors (in this case), since
>> the rename operation silently overwrites existing destination files.
>> However, blindly ignoring these errors might make the pipeline to report
>> success even though output files are missing.
>>
>> 2. Output files (--ouput) overwrite existing files.
>>
>> 3. The Python SDK doesn't use Filesystems.copy(). The Java SDK doesn't
>> use Filesystem.Rename().
>>
>> Thanks,
>> - Udi
>>
>
>


Re: Filesystems.copy and .rename behavior

2018-01-30 Thread Reuven Lax
I think the idea was to ignore "already exists" errors. The reason being
that any step in Beam can be executed multiple times, including the rename
step. If the rename step gets run twice, the second run should succeed
vacuously.

On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri  wrote:

> Hi,
> I've been working on HDFS code for the Python SDK and I've noticed some
> behaviors which are surprising. I wanted to know if these behaviors are
> known and intended.
>
> 1. When renaming files during finalize_write, rename errors are ignored
> .
> For example, if I run wordcount twice using HDFS code I get a warning the
> second time because the file already exists:
>
> WARNING:root:Rename not successful: hdfs://beam-temp-counts2-
> 7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
> -> hdfs://counts2-0-of-1, libhdfs error in renaming
> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da2
> 45/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2 to
> hdfs://counts2-0-of-1 with exceptions Unable to rename
> '/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da2
> 45/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2' to
> '/counts2-0-of-1'.
>
> For GCS and local files there are no rename errors (in this case), since
> the rename operation silently overwrites existing destination files.
> However, blindly ignoring these errors might make the pipeline to report
> success even though output files are missing.
>
> 2. Output files (--ouput) overwrite existing files.
>
> 3. The Python SDK doesn't use Filesystems.copy(). The Java SDK doesn't use
> Filesystem.Rename().
>
> Thanks,
> - Udi
>


Filesystems.copy and .rename behavior

2018-01-30 Thread Udi Meiri
Hi,
I've been working on HDFS code for the Python SDK and I've noticed some
behaviors which are surprising. I wanted to know if these behaviors are
known and intended.

1. When renaming files during finalize_write, rename errors are ignored
.
For example, if I run wordcount twice using HDFS code I get a warning the
second time because the file already exists:

WARNING:root:Rename not successful:
hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
-> hdfs://counts2-0-of-1, libhdfs error in renaming
hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
to hdfs://counts2-0-of-1 with exceptions Unable to rename
'/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2'
to '/counts2-0-of-1'.

For GCS and local files there are no rename errors (in this case), since
the rename operation silently overwrites existing destination files.
However, blindly ignoring these errors might make the pipeline to report
success even though output files are missing.

2. Output files (--ouput) overwrite existing files.

3. The Python SDK doesn't use Filesystems.copy(). The Java SDK doesn't use
Filesystem.Rename().

Thanks,
- Udi


smime.p7s
Description: S/MIME Cryptographic Signature