Thanks Josh... nailed it. Are we planning on shipping 1.0.0 out soon? Need any help with that? And now that it's running - I have generated the same pipeline also in Spark (w/o Crunch, using RDDs) and I compared the DAGs Spark generated, and they look completely different, with Crunch adding many intermediate steps in each step (map, mapPartitions, mapPartitionsWithIndex to name a few). Can you give me some insight as to how is Crunch submitting the jobs to Spark? I am going to do some benchmarking, but will there be overhead to these extra steps? Thanks, Ron.
On Fri, May 26, 2017 at 10:30 PM Josh Wills <[email protected]> wrote: > Hey Ron, > > If I had to guess, I'd suspect you'll need to run against a version built > from master that has these patches included: > > https://issues.apache.org/jira/browse/CRUNCH-618 > > Josh > > On Fri, May 26, 2017 at 12:27 PM, Ron Hashimshony < > [email protected]> wrote: > >> Hi, >> I have a pipeline that runs fine with MRPipeline. I tried to replace it >> with SparkPipeline, and am getting AbstractMethodError. >> This is the code (reduced to highlight the problem): >> >> Pipeline pipe = new SparkPipeline("local", "test"); >> >> PCollection<siteInfo> siteInfoInput = pipe.read(From.avroFile(input, >> Avros.records(siteInfo.class))); >> >> PTable<Long, Integer> siteToPart = siteInfoInput.parallelDo(new >> MapFn<site_info, Pair<Long, Integer>>() { >> @Override >> public Pair<Long, Integer> map(siteInfo input) { >> return Pair.of(input.getSiteId(), input.getPartitionId()); >> } >> }, Avros.tableOf(Avros.longs(), Avros.ints())); >> >> siteToPart.write(To.textFile(output)); >> pipe.done(); >> >> >> The exception is this: >> >> java.lang.AbstractMethodError: >> org.apache.crunch.impl.spark.fn.CrunchPairTuple2.call(Ljava/lang/Object;)Ljava/util/Iterator; >> at >> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186) >> at >> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) >> at org.apache.spark.scheduler.Task.run(Task.scala:99) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> Tried with both crunch 0.13.0 and 0.15.0 with spark 2.1 >> >> Thanks, >> Ron. >> > >
