Thanks for the responses everyone. My work partner Mingyu who is cc'ed here may 
be able to provide more context as to the use case of the visitor.

The ground issue I think is that while Spark is great for parallel computation, 
there will come a point AFTER these computations where we'd need to eventually 
perform some kind of visiting operation. For example, suppose the cluster 
performs a decently sized parallel computation, and the result is to be 
streamed out in-order to a listening socket? It seems like even more overhead 
for the RDD to need to be saved to disk first and read back out again to get 
this sequential behavior.

I appreciate the discussion though. Quite enlightening.

Thanks,

-Matt Cheah

From: Christopher Nguyen <c...@adatao.com<mailto:c...@adatao.com>>
Date: Tuesday, October 22, 2013 2:23 PM
To: "user@spark.incubator.apache.org<mailto:user@spark.incubator.apache.org>" 
<user@spark.incubator.apache.org<mailto:user@spark.incubator.apache.org>>, 
Andrew Winings <mch...@palantir.com<mailto:mch...@palantir.com>>
Cc: Mingyu Kim <m...@palantir.com<mailto:m...@palantir.com>>
Subject: Re: Visitor function to RDD elements

For better precision,

s/Or to be able to handle very large data sets ("big memory")/Or to be able to 
hold very large data sets in one place ("big memory")/g

--
Christopher T. Nguyen
Co-founder & CEO, 
Adatao<https://urldefense.proofpoint.com/v1/url?u=http://adatao.com&k=fDZpZZQMmYwf27OU23GmAQ%3D%3D%0A&r=gxvgJndY02bAG2cHbPl1cUTcd%2FLzFGz7wtfiAfRKPpk%3D%0A&m=wN%2Fx%2FhPP%2BKO%2FchVEKgSYK9Qscw6MPdvECQix79iTADk%3D%0A&s=5ccc6cda99f52249627b7e5ca0394b74029b623dfffc1a826c513cd3e2cb2913>
linkedin.com/in/ctnguyen<https://urldefense.proofpoint.com/v1/url?u=http://linkedin.com/in/ctnguyen&k=fDZpZZQMmYwf27OU23GmAQ%3D%3D%0A&r=gxvgJndY02bAG2cHbPl1cUTcd%2FLzFGz7wtfiAfRKPpk%3D%0A&m=wN%2Fx%2FhPP%2BKO%2FchVEKgSYK9Qscw6MPdvECQix79iTADk%3D%0A&s=3c7ae0c0e983c6b2399863b70b0c594a7511d88eb8bec927c18e74bc81f670fc>



On Tue, Oct 22, 2013 at 2:16 PM, Christopher Nguyen 
<c...@adatao.com<mailto:c...@adatao.com>> wrote:
Matt, it would be useful to back up one level to your problem statement. If it 
is strictly restricted as described, then you have a sequential problem that's 
not parallelizable. What is the primary design goal here? To complete the 
operation in the shortest time possible ("big compute")? Or to be able to 
handle very large data sets ("big memory")?  Or to ensure that the operation 
completes in a fault-tolerant manner ("reliability")?

There are two paths from here:

  1.  Finding parallelizable opportunities: there may be ways to squint at the 
problem in just the right way that provides a way to parallelize it:
     *   Maybe you can come up with some algebra or approximations that allows 
for associativity, so that different partitions of the data can be operated on 
in parallel.
     *   Perhaps the data is a time series where weekly or monthly chunks can 
be summarized in parallel and the sequential logic can be brought up several 
hierarchical levels.
     *   Perhaps the statefulness of the visitor has a finite memory of past 
visits that you can take advantage of.
  2.  Finding alternatives: it's important to realize that Spark's strength is 
in "big compute" and not in "big memory". It's only 1 of the 13 dwarfs of 
parallel computing patterns, the map-reduce, shared-nothing model (cf. D. 
Patterson et al., "A View From Berkeley ...", under "Monte Carlo"). It's a very 
successful model, but one that sometimes requires a refactoring of the 
algorithm/data to make it applicable. So if #1 above isn't at all possible, you 
might look into a "big memory" approach, such as Tachyon, or memcached, or even 
just reading a big file sequentially and applying your visitor to each data 
row, depending critically on what bottleneck you are engineering against.

--
Christopher T. Nguyen
Co-founder & CEO, 
Adatao<https://urldefense.proofpoint.com/v1/url?u=http://adatao.com&k=fDZpZZQMmYwf27OU23GmAQ%3D%3D%0A&r=gxvgJndY02bAG2cHbPl1cUTcd%2FLzFGz7wtfiAfRKPpk%3D%0A&m=wN%2Fx%2FhPP%2BKO%2FchVEKgSYK9Qscw6MPdvECQix79iTADk%3D%0A&s=5ccc6cda99f52249627b7e5ca0394b74029b623dfffc1a826c513cd3e2cb2913>
linkedin.com/in/ctnguyen<https://urldefense.proofpoint.com/v1/url?u=http://linkedin.com/in/ctnguyen&k=fDZpZZQMmYwf27OU23GmAQ%3D%3D%0A&r=gxvgJndY02bAG2cHbPl1cUTcd%2FLzFGz7wtfiAfRKPpk%3D%0A&m=wN%2Fx%2FhPP%2BKO%2FchVEKgSYK9Qscw6MPdvECQix79iTADk%3D%0A&s=3c7ae0c0e983c6b2399863b70b0c594a7511d88eb8bec927c18e74bc81f670fc>



On Tue, Oct 22, 2013 at 12:28 PM, Matt Cheah 
<mch...@palantir.com<mailto:mch...@palantir.com>> wrote:
Hi everyone,

I have a driver holding a reference to an RDD. The driver would like to "visit" 
each item in the RDD in order, say with a visitor object that invokes 
visit(item) to modify that visitor's internal state. The visiting is not 
commutative (e.g. Visiting item A then B makes a different internal state from 
visiting item B then item A). Items in the RDD also are not necessarily 
distinct.

I've looked into accumulators which don't work because they require the 
operation to be commutative. Collect() will not work because the RDD is too 
large; in general, bringing the whole RDD into one partition won't work since 
the RDD is too large.

Is it possible to iterate over the items in an RDD in order without bringing 
the entire dataset into a single JVM at a time, and/or obtain chunks of the RDD 
in order on the driver? We've tried using the internal iterator() method. In 
some cases, we get a stack trace (running locally with 3 threads). I've 
included the stack trace below.

Thanks,

-Matt Cheah

org.apache.spark.SparkException: Error communicating with MapOutputTracker
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
at 
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
at 
org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:39)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.api.java.JavaRDDLike$class.iterator(JavaRDDLike.scala:60)
at org.apache.spark.api.java.JavaRDD.iterator(JavaRDD.scala:25)
at 
com.palantir.finance.server.service.datatable.SparkRawDataTableProvider.compute(SparkRawDataTableProvider.java:76)
at 
com.palantir.finance.server.datatable.spark.SparkDataTable.visit(SparkDataTable.java:83)
at 
com.palantir.finance.server.datatable.DataTableImplementationParityTests.runDataTableTest(DataTableImplementationParityTests.java:129)
at 
com.palantir.finance.server.datatable.DataTableImplementationParityTests.testParityOnSort(DataTableImplementationParityTests.java:102)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at 
com.palantir.finance.commons.service.ServiceThreadContainerRule$1.evaluate(ServiceThreadContainerRule.java:28)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at 
org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at 
org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
[10000] milliseconds
at 
org.apache.spark.internal.akka.dispatch.DefaultPromise.ready(Future.scala:870)
at 
org.apache.spark.internal.akka.dispatch.DefaultPromise.result(Future.scala:874)
at org.apache.spark.internal.akka.dispatch.Await$.result(Future.scala:74)
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
... 46 more



Reply via email to