Thanks Josh... nailed it.
Are we planning on shipping 1.0.0 out soon? Need any help with that?
And now that it's running - I have generated the same pipeline also in
Spark (w/o Crunch, using RDDs) and I compared the DAGs Spark generated, and
they look completely different, with Crunch adding many intermediate steps
in each step (map, mapPartitions, mapPartitionsWithIndex to name a few).
Can you give me some insight as to how is Crunch submitting the jobs to
Spark?
I am going to do some benchmarking, but will there be overhead to these
extra steps?
Thanks,
Ron.

On Fri, May 26, 2017 at 10:30 PM Josh Wills <[email protected]> wrote:

> Hey Ron,
>
> If I had to guess, I'd suspect you'll need to run against a version built
> from master that has these patches included:
>
> https://issues.apache.org/jira/browse/CRUNCH-618
>
> Josh
>
> On Fri, May 26, 2017 at 12:27 PM, Ron Hashimshony <
> [email protected]> wrote:
>
>> Hi,
>> I have a pipeline that runs fine with MRPipeline. I tried to replace it
>> with SparkPipeline, and am getting AbstractMethodError.
>> This is the code (reduced to highlight the problem):
>>
>> Pipeline pipe = new SparkPipeline("local", "test");
>>
>> PCollection<siteInfo> siteInfoInput = pipe.read(From.avroFile(input, 
>> Avros.records(siteInfo.class)));
>>
>> PTable<Long, Integer> siteToPart = siteInfoInput.parallelDo(new 
>> MapFn<site_info, Pair<Long, Integer>>() {
>>     @Override
>>     public Pair<Long, Integer> map(siteInfo input) {
>>         return Pair.of(input.getSiteId(), input.getPartitionId());
>>     }
>> }, Avros.tableOf(Avros.longs(), Avros.ints()));
>>
>> siteToPart.write(To.textFile(output));
>> pipe.done();
>>
>>
>> The exception is this:
>>
>> java.lang.AbstractMethodError:
>> org.apache.crunch.impl.spark.fn.CrunchPairTuple2.call(Ljava/lang/Object;)Ljava/util/Iterator;
>> at
>> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
>> at
>> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>> at org.apache.spark.scheduler.Task.run(Task.scala:99)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Tried with both crunch 0.13.0 and 0.15.0 with spark 2.1
>>
>> Thanks,
>> Ron.
>>
>
>

Reply via email to