Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/3832#issuecomment-68580794
  
    @tdas I've updated this PR and added a test case.  My test case uses calls 
inside of a `transform()` call to emulate what Streaming's `saveAsHadoopFiles` 
operation does.  Is this a valid use of `transform()` or am I breaking rules by 
having actions in my transform function?  My gut says that we shouldn't endorse 
/ recommend this for the same reason that we advise against using accumulators 
inside of map() tasks: the transform call might get evaluated multiple times if 
caching isn't use, which makes it possible to write programs whose behavior 
changes depending on whether caching is enabled.
    
    I wasn't able to get the existing "recovery with saveAsNewAPIHadoopFiles 
operation" test to fail, though, even though I discovered this bug while 
refactoring that test in my other PR.  I think that the issue is that the 
failed `saveAsNewAPIHadoopFiles` jobs failed but did not trigger a failure of 
the other actions / transformations in that batch, so we still got the correct 
output even though the batch completion event wasn't posted to the listener 
bus.  The current tests rely on wall-clock time to detect when batches have 
been processed and hence didn't detect that the batch completion event was 
missing.  I noticed that the StreamingListener API doesn't really have any 
events for job / batch failures, but that's a topic for a separate PR.
    
    I was about to write that this bug might not actually affect users who 
don't use `transform` but it still impacts users in the partial-failure case 
where they've used PairDStreamFunctions.saveAsHadoopFiles() but a batch fails 
with partially-written output: an individual output _partition_ might be 
atomically committed to the output directory (e.g. if the file exists, then it 
has the right contents), but I think we can still wind up in a scenario where 
only a subset of the partitions are written and the non-empty output directory 
prevents the recovery from recomputing the missing partitions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to