Unfortunately, I think you're going to either have to fly a lot of data around or create a lot of garbage.
On Tue, Oct 22, 2013 at 3:36 PM, Patrick Wendell <[email protected]> wrote: > Hey Matt, > > It seems like you are trying to perform an operation that just isn't > parrallelizable. In that case, it's going to be tricky without collecting > the entire dataset on one node. > > Spark does not expose an iterator like you are suggesting, that lets you > traverse an RDD. You could build one yourself though by collecting one > partition at a time at the driver, thought this would require some lower > level understanding of Spark. > > - Patrick > > > > On Tue, Oct 22, 2013 at 1:02 PM, Matt Cheah <[email protected]> wrote: > >> In this context, it would be able to create a visitor mapping for each >> partition. However, I'm looking for the ability to use a single visitor >> object that will walk over all partitions. >> >> I suppose I could do this if I used coalesce() to combine everything to >> one partition but that's too much memory in one partition. Am I >> misinterpreting how to use it? >> >> From: Mark Hamstra <[email protected]> >> Reply-To: "[email protected]" < >> [email protected]> >> Date: Tuesday, October 22, 2013 12:51 PM >> To: user <[email protected]> >> Subject: Re: Visitor function to RDD elements >> >> mapPartitions >> mapPartitionsWithIndex >> >> With care, you can use these and maintain the iteration order within >> partitions. Beware, though, that any reduce functions need to be >> associative and commutative. >> >> >> On Tue, Oct 22, 2013 at 12:28 PM, Matt Cheah <[email protected]> wrote: >> >>> Hi everyone, >>> >>> I have a driver holding a reference to an RDD. The driver would like >>> to "visit" each item in the RDD in order, say with a visitor object that >>> invokes visit(item) to modify that visitor's internal state. The visiting >>> is not commutative (e.g. Visiting item A then B makes a different internal >>> state from visiting item B then item A). Items in the RDD also are not >>> necessarily distinct. >>> >>> I've looked into accumulators which don't work because they require >>> the operation to be commutative. Collect() will not work because the RDD is >>> too large; in general, bringing the whole RDD into one partition won't work >>> since the RDD is too large. >>> >>> Is it possible to iterate over the items in an RDD in order without >>> bringing the entire dataset into a single JVM at a time, and/or obtain >>> chunks of the RDD in order on the driver? We've tried using the internal >>> iterator() method. In some cases, we get a stack trace (running locally >>> with 3 threads). I've included the stack trace below. >>> >>> Thanks, >>> >>> -Matt Cheah >>> >>> org.apache.spark.SparkException: Error communicating with >>> MapOutputTracker >>> at >>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84) >>> at >>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170) >>> at >>> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:39) >>> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) >>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) >>> at >>> org.apache.spark.api.java.JavaRDDLike$class.iterator(JavaRDDLike.scala:60) >>> at org.apache.spark.api.java.JavaRDD.iterator(JavaRDD.scala:25) >>> at >>> com.palantir.finance.server.service.datatable.SparkRawDataTableProvider.compute(SparkRawDataTableProvider.java:76) >>> at >>> com.palantir.finance.server.datatable.spark.SparkDataTable.visit(SparkDataTable.java:83) >>> at >>> com.palantir.finance.server.datatable.DataTableImplementationParityTests.runDataTableTest(DataTableImplementationParityTests.java:129) >>> at >>> com.palantir.finance.server.datatable.DataTableImplementationParityTests.testParityOnSort(DataTableImplementationParityTests.java:102) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:606) >>> at >>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) >>> at >>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) >>> at >>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) >>> at >>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) >>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) >>> at org.junit.rules.RunRules.evaluate(RunRules.java:20) >>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) >>> at >>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) >>> at >>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) >>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) >>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) >>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) >>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) >>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) >>> at >>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) >>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) >>> at >>> com.palantir.finance.commons.service.ServiceThreadContainerRule$1.evaluate(ServiceThreadContainerRule.java:28) >>> at org.junit.rules.RunRules.evaluate(RunRules.java:20) >>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309) >>> at >>> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50) >>> at >>> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) >>> at >>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467) >>> at >>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683) >>> at >>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390) >>> at >>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197) >>> Caused by: java.util.concurrent.TimeoutException: Futures timed out >>> after [10000] milliseconds >>> at >>> org.apache.spark.internal.akka.dispatch.DefaultPromise.ready(Future.scala:870) >>> at >>> org.apache.spark.internal.akka.dispatch.DefaultPromise.result(Future.scala:874) >>> at org.apache.spark.internal.akka.dispatch.Await$.result(Future.scala:74) >>> at >>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81) >>> ... 46 more >>> >>> >> >
