For better precision,
s/Or to be able to handle very large data sets ("big memory")/Or to be able
to hold very large data sets in one place ("big memory")/g
--
Christopher T. Nguyen
Co-founder & CEO, Adatao <http://adatao.com>
linkedin.com/in/ctnguyen
On Tue, Oct 22, 2013 at 2:16 PM, Christopher Nguyen <[email protected]> wrote:
> Matt, it would be useful to back up one level to your problem statement.
> If it is strictly restricted as described, then you have a sequential
> problem that's not parallelizable. What is the primary design goal here? To
> complete the operation in the shortest time possible ("big compute")? Or to
> be able to handle very large data sets ("big memory")? Or to ensure that
> the operation completes in a fault-tolerant manner ("reliability")?
>
> There are two paths from here:
>
> 1. Finding parallelizable opportunities: there may be ways to squint
> at the problem in just the right way that provides a way to parallelize it:
> - Maybe you can come up with some algebra or approximations that
> allows for associativity, so that different partitions of the data can
> be
> operated on in parallel.
> - Perhaps the data is a time series where weekly or monthly chunks
> can be summarized in parallel and the sequential logic can be brought up
> several hierarchical levels.
> - Perhaps the statefulness of the visitor has a finite memory of
> past visits that you can take advantage of.
> 2. Finding alternatives: it's important to realize that Spark's
> strength is in "big compute" and not in "big memory". It's only 1 of the 13
> dwarfs of parallel computing patterns, the map-reduce, shared-nothing model
> (cf. D. Patterson et al., "A View From Berkeley ...", under "Monte Carlo").
> It's a very successful model, but one that sometimes requires a refactoring
> of the algorithm/data to make it applicable. So if #1 above isn't at all
> possible, you might look into a "big memory" approach, such as Tachyon, or
> memcached, or even just reading a big file sequentially and applying your
> visitor to each data row, depending critically on what bottleneck you are
> engineering against.
>
>
> --
> Christopher T. Nguyen
> Co-founder & CEO, Adatao <http://adatao.com>
> linkedin.com/in/ctnguyen
>
>
>
> On Tue, Oct 22, 2013 at 12:28 PM, Matt Cheah <[email protected]> wrote:
>
>> Hi everyone,
>>
>> I have a driver holding a reference to an RDD. The driver would like to
>> "visit" each item in the RDD in order, say with a visitor object that
>> invokes visit(item) to modify that visitor's internal state. The visiting
>> is not commutative (e.g. Visiting item A then B makes a different internal
>> state from visiting item B then item A). Items in the RDD also are not
>> necessarily distinct.
>>
>> I've looked into accumulators which don't work because they require the
>> operation to be commutative. Collect() will not work because the RDD is too
>> large; in general, bringing the whole RDD into one partition won't work
>> since the RDD is too large.
>>
>> Is it possible to iterate over the items in an RDD in order without
>> bringing the entire dataset into a single JVM at a time, and/or obtain
>> chunks of the RDD in order on the driver? We've tried using the internal
>> iterator() method. In some cases, we get a stack trace (running locally
>> with 3 threads). I've included the stack trace below.
>>
>> Thanks,
>>
>> -Matt Cheah
>>
>> org.apache.spark.SparkException: Error communicating with
>> MapOutputTracker
>> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
>> at
>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
>> at
>> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:39)
>> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at
>> org.apache.spark.api.java.JavaRDDLike$class.iterator(JavaRDDLike.scala:60)
>> at org.apache.spark.api.java.JavaRDD.iterator(JavaRDD.scala:25)
>> at
>> com.palantir.finance.server.service.datatable.SparkRawDataTableProvider.compute(SparkRawDataTableProvider.java:76)
>> at
>> com.palantir.finance.server.datatable.spark.SparkDataTable.visit(SparkDataTable.java:83)
>> at
>> com.palantir.finance.server.datatable.DataTableImplementationParityTests.runDataTableTest(DataTableImplementationParityTests.java:129)
>> at
>> com.palantir.finance.server.datatable.DataTableImplementationParityTests.testParityOnSort(DataTableImplementationParityTests.java:102)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>> at
>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>> at
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>> at
>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>> at
>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>> at
>> com.palantir.finance.commons.service.ServiceThreadContainerRule$1.evaluate(ServiceThreadContainerRule.java:28)
>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>> at
>> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
>> at
>> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
>> at
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
>> at
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
>> at
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
>> at
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
>> [10000] milliseconds
>> at
>> org.apache.spark.internal.akka.dispatch.DefaultPromise.ready(Future.scala:870)
>> at
>> org.apache.spark.internal.akka.dispatch.DefaultPromise.result(Future.scala:874)
>> at org.apache.spark.internal.akka.dispatch.Await$.result(Future.scala:74)
>> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
>> ... 46 more
>>
>>
>