Hi,
I have a pipeline that runs fine with MRPipeline. I tried to replace it
with SparkPipeline, and am getting AbstractMethodError.
This is the code (reduced to highlight the problem):
Pipeline pipe = new SparkPipeline("local", "test");
PCollection<siteInfo> siteInfoInput = pipe.read(From.avroFile(input,
Avros.records(siteInfo.class)));
PTable<Long, Integer> siteToPart = siteInfoInput.parallelDo(new
MapFn<site_info, Pair<Long, Integer>>() {
@Override
public Pair<Long, Integer> map(siteInfo input) {
return Pair.of(input.getSiteId(), input.getPartitionId());
}
}, Avros.tableOf(Avros.longs(), Avros.ints()));
siteToPart.write(To.textFile(output));
pipe.done();
The exception is this:
java.lang.AbstractMethodError:
org.apache.crunch.impl.spark.fn.CrunchPairTuple2.call(Ljava/lang/Object;)Ljava/util/Iterator;
at
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
at
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Tried with both crunch 0.13.0 and 0.15.0 with spark 2.1
Thanks,
Ron.