Hey Josh, Yes! Spot on -- that's exactly what we were doing. That solved it, thanks!
- Peter On Tue, Jan 20, 2015 at 9:35 PM, Josh Wills <[email protected]> wrote: > Hey Peter, > > So I wrote this integration test to test out how Crunch-on-Spark handled > inter-job failures, and it seemed like things happened correctly when I > called a Spark job that had a failing function. However, we also have a > condition that a no-op job (i.e., calling Pipeline.done when there are no > targets to write out), will also return succeeded() == true. I'm wondering > if that is what is happening here-- are you calling run() at one point, > getting a failure in the PipelineResult that is returned, and then calling > done() and getting the dummy PipelineResult that always returns succeeded() > == true b/c it didn't do anything? > > J > > package org.apache.crunch; > > import org.apache.crunch.impl.spark.SparkPipeline; > import org.apache.crunch.io.To; > import org.apache.crunch.test.TemporaryPath; > import org.apache.crunch.types.writable.Writables; > import org.junit.Rule; > import org.junit.Test; > > import static org.junit.Assert.assertEquals; > > public class SparkFailureIT { > @Rule > public TemporaryPath tmpDir = new TemporaryPath(); > > @Test > public void testFailure() throws Exception { > Pipeline p = new SparkPipeline("local", "failure"); > PCollection<String> shakes = > p.readTextFile(tmpDir.copyResourceFileName("shakes.txt")); > PCollection<String> lower = shakes.parallelDo(new FailureFn(), > Writables.strings()); > lower.write(To.textFile(tmpDir.getPath("out"))); > assertEquals(false, p.done().succeeded()); > assertEquals(false, p.done().succeeded()); > } > > public static class FailureFn extends DoFn<String, String> { > @Override > public void process(String input, Emitter<String> emitter) { > throw new RuntimeException("Oh no"); > } > } > } > > On Tue, Jan 20, 2015 at 10:01 AM, Peter Dolan <[email protected]> > wrote: > >> Thanks Josh! >> >> On Tue, Jan 20, 2015 at 9:58 AM, Josh Wills <[email protected]> wrote: >> >>> Okay, created https://issues.apache.org/jira/browse/CRUNCH-488 to track >>> it. Should get a patch together by tmrw. >>> >>> J >>> >>> On Mon, Jan 19, 2015 at 4:57 PM, Peter Dolan <[email protected]> >>> wrote: >>> >>>> So far I've only tried this in the SparkPipeline. In MemPipeline the >>>> entire JVM dies, so we don't get to determine success or failure. >>>> >>>> On Mon, Jan 19, 2015 at 10:47 AM, Josh Wills <[email protected]> >>>> wrote: >>>> >>>>> No, that's not good, we should fix that. Is it only in the >>>>> SparkPipeline that the situation occurs? >>>>> >>>>> On Mon, Jan 19, 2015 at 8:28 AM, Peter Dolan <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Crunchers, >>>>>> >>>>>> At Nuna we've been using Crunch extensively, and I'm really thrilled >>>>>> with it. It's excellent. There are of course some rough edges though. >>>>>> >>>>>> Today I ran into some exceptions being thrown in the Spark pipeline, >>>>>> and am curious why they weren't resulting in the PipelineResult reporting >>>>>> failure. In particular, my spark pipeline (running with a local spark >>>>>> instance, that is with the spark master set to "local[16]") failed with >>>>>> an >>>>>> IOException when the machine ran out of space in /tmp/. The >>>>>> PipelineResult >>>>>> retrieved by Pipeline#done returned true from PipelineResult#succeeded. >>>>>> >>>>>> I've seen this in a couple other contexts, for example when a MapFn >>>>>> threw an exception within MapFn#map, which did not result in a false >>>>>> success value. >>>>>> >>>>>> Is this expected / intended behavior? Should I be getting at the >>>>>> success or failure of the execution some other way? >>>>>> >>>>>> Thanks! >>>>>> - Peter >>>>>> >>>>> >>>>> >>>> >>> >>> >>> -- >>> Director of Data Science >>> Cloudera <http://www.cloudera.com> >>> Twitter: @josh_wills <http://twitter.com/josh_wills> >>> >> >> > > > -- > Director of Data Science > Cloudera <http://www.cloudera.com> > Twitter: @josh_wills <http://twitter.com/josh_wills> >
