In this context, it would be able to create a visitor mapping for each 
partition. However, I'm looking for the ability to use a single visitor object 
that will walk over all partitions.

I suppose I could do this if I used coalesce() to combine everything to one 
partition but that's too much memory in one partition. Am I misinterpreting how 
to use it?

From: Mark Hamstra <[email protected]<mailto:[email protected]>>
Reply-To: 
"[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Tuesday, October 22, 2013 12:51 PM
To: user 
<[email protected]<mailto:[email protected]>>
Subject: Re: Visitor function to RDD elements

mapPartitions
mapPartitionsWithIndex

With care, you can use these and maintain the iteration order within 
partitions.  Beware, though, that any reduce functions need to be associative 
and commutative.


On Tue, Oct 22, 2013 at 12:28 PM, Matt Cheah 
<[email protected]<mailto:[email protected]>> wrote:
Hi everyone,

I have a driver holding a reference to an RDD. The driver would like to "visit" 
each item in the RDD in order, say with a visitor object that invokes 
visit(item) to modify that visitor's internal state. The visiting is not 
commutative (e.g. Visiting item A then B makes a different internal state from 
visiting item B then item A). Items in the RDD also are not necessarily 
distinct.

I've looked into accumulators which don't work because they require the 
operation to be commutative. Collect() will not work because the RDD is too 
large; in general, bringing the whole RDD into one partition won't work since 
the RDD is too large.

Is it possible to iterate over the items in an RDD in order without bringing 
the entire dataset into a single JVM at a time, and/or obtain chunks of the RDD 
in order on the driver? We've tried using the internal iterator() method. In 
some cases, we get a stack trace (running locally with 3 threads). I've 
included the stack trace below.

Thanks,

-Matt Cheah

org.apache.spark.SparkException: Error communicating with MapOutputTracker
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
at 
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
at 
org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:39)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.api.java.JavaRDDLike$class.iterator(JavaRDDLike.scala:60)
at org.apache.spark.api.java.JavaRDD.iterator(JavaRDD.scala:25)
at 
com.palantir.finance.server.service.datatable.SparkRawDataTableProvider.compute(SparkRawDataTableProvider.java:76)
at 
com.palantir.finance.server.datatable.spark.SparkDataTable.visit(SparkDataTable.java:83)
at 
com.palantir.finance.server.datatable.DataTableImplementationParityTests.runDataTableTest(DataTableImplementationParityTests.java:129)
at 
com.palantir.finance.server.datatable.DataTableImplementationParityTests.testParityOnSort(DataTableImplementationParityTests.java:102)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at 
com.palantir.finance.commons.service.ServiceThreadContainerRule$1.evaluate(ServiceThreadContainerRule.java:28)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at 
org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at 
org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
[10000] milliseconds
at 
org.apache.spark.internal.akka.dispatch.DefaultPromise.ready(Future.scala:870)
at 
org.apache.spark.internal.akka.dispatch.DefaultPromise.result(Future.scala:874)
at org.apache.spark.internal.akka.dispatch.Await$.result(Future.scala:74)
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
... 46 more


Reply via email to