Hey Matt, It seems like you are trying to perform an operation that just isn't parrallelizable. In that case, it's going to be tricky without collecting the entire dataset on one node.
Spark does not expose an iterator like you are suggesting, that lets you traverse an RDD. You could build one yourself though by collecting one partition at a time at the driver, thought this would require some lower level understanding of Spark. - Patrick On Tue, Oct 22, 2013 at 1:02 PM, Matt Cheah <[email protected]> wrote: > In this context, it would be able to create a visitor mapping for each > partition. However, I'm looking for the ability to use a single visitor > object that will walk over all partitions. > > I suppose I could do this if I used coalesce() to combine everything to > one partition but that's too much memory in one partition. Am I > misinterpreting how to use it? > > From: Mark Hamstra <[email protected]> > Reply-To: "[email protected]" < > [email protected]> > Date: Tuesday, October 22, 2013 12:51 PM > To: user <[email protected]> > Subject: Re: Visitor function to RDD elements > > mapPartitions > mapPartitionsWithIndex > > With care, you can use these and maintain the iteration order within > partitions. Beware, though, that any reduce functions need to be > associative and commutative. > > > On Tue, Oct 22, 2013 at 12:28 PM, Matt Cheah <[email protected]> wrote: > >> Hi everyone, >> >> I have a driver holding a reference to an RDD. The driver would like to >> "visit" each item in the RDD in order, say with a visitor object that >> invokes visit(item) to modify that visitor's internal state. The visiting >> is not commutative (e.g. Visiting item A then B makes a different internal >> state from visiting item B then item A). Items in the RDD also are not >> necessarily distinct. >> >> I've looked into accumulators which don't work because they require the >> operation to be commutative. Collect() will not work because the RDD is too >> large; in general, bringing the whole RDD into one partition won't work >> since the RDD is too large. >> >> Is it possible to iterate over the items in an RDD in order without >> bringing the entire dataset into a single JVM at a time, and/or obtain >> chunks of the RDD in order on the driver? We've tried using the internal >> iterator() method. In some cases, we get a stack trace (running locally >> with 3 threads). I've included the stack trace below. >> >> Thanks, >> >> -Matt Cheah >> >> org.apache.spark.SparkException: Error communicating with >> MapOutputTracker >> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84) >> at >> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170) >> at >> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:39) >> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) >> at >> org.apache.spark.api.java.JavaRDDLike$class.iterator(JavaRDDLike.scala:60) >> at org.apache.spark.api.java.JavaRDD.iterator(JavaRDD.scala:25) >> at >> com.palantir.finance.server.service.datatable.SparkRawDataTableProvider.compute(SparkRawDataTableProvider.java:76) >> at >> com.palantir.finance.server.datatable.spark.SparkDataTable.visit(SparkDataTable.java:83) >> at >> com.palantir.finance.server.datatable.DataTableImplementationParityTests.runDataTableTest(DataTableImplementationParityTests.java:129) >> at >> com.palantir.finance.server.datatable.DataTableImplementationParityTests.testParityOnSort(DataTableImplementationParityTests.java:102) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at >> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) >> at >> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) >> at >> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) >> at >> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) >> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) >> at org.junit.rules.RunRules.evaluate(RunRules.java:20) >> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) >> at >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) >> at >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) >> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) >> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) >> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) >> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) >> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) >> at >> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) >> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) >> at >> com.palantir.finance.commons.service.ServiceThreadContainerRule$1.evaluate(ServiceThreadContainerRule.java:28) >> at org.junit.rules.RunRules.evaluate(RunRules.java:20) >> at org.junit.runners.ParentRunner.run(ParentRunner.java:309) >> at >> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50) >> at >> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) >> at >> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467) >> at >> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683) >> at >> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390) >> at >> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197) >> Caused by: java.util.concurrent.TimeoutException: Futures timed out after >> [10000] milliseconds >> at >> org.apache.spark.internal.akka.dispatch.DefaultPromise.ready(Future.scala:870) >> at >> org.apache.spark.internal.akka.dispatch.DefaultPromise.result(Future.scala:874) >> at org.apache.spark.internal.akka.dispatch.Await$.result(Future.scala:74) >> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81) >> ... 46 more >> >> >
