Re: Is SPARK-3322 fixed in latest version of Spark?

2015-08-04 Thread Aaron Davidson
ConnectionManager has been deprecated and is no longer used by default (NettyBlockTransferService is the replacement). Hopefully you would no longer see these messages unless you have explicitly flipped it back on. On Tue, Aug 4, 2015 at 6:14 PM, Jim Green wrote: > And also https://issues.apache

Re: S3 vs HDFS

2015-07-11 Thread Aaron Davidson
Note that if you use multi-part upload, each part becomes 1 block, which allows for multiple concurrent readers. One would typically use fixed-size block sizes which align with Spark's default HDFS block size (64 MB, I think) to ensure the reads are aligned. On Sat, Jul 11, 2015 at 11:14 AM, Steve

Re: All master are unreponsive issue

2015-07-05 Thread Aaron Davidson
Are you seeing this after the app has already been running for some time, or just at the beginning? Generally, registration should only occur once initially, and a timeout would be due to the master not being accessible. Try telneting to the master IP/port from the machine on which the driver will

Re: s3 bucket access/read file

2015-07-01 Thread Aaron Davidson
I think 2.6 failed to abruptly close streams that weren't fully read, which we observed as a huge performance hit. We had to backport the 2.7 improvements before being able to use it.

Re: s3 bucket access/read file

2015-06-30 Thread Aaron Davidson
Should be able to use s3a (on new hadoop versions), I believe that will try or at least has a setting for v4 On Tue, Jun 30, 2015 at 8:31 PM, Exie wrote: > Not sure if this helps, but the options I set are slightly different: > > val hadoopConf=sc.hadoopConfiguration > hadoopConf.set("fs.s3n.aws

RE: ReduceByKey with a byte array as the key

2015-06-11 Thread Aaron Davidson
Be careful shoving arbitrary binary data into a string, invalid utf characters can cause significant computational overhead in my experience. On Jun 11, 2015 10:09 AM, "Mark Tse" wrote: > Makes sense – I suspect what you suggested should work. > > > > However, I think the overhead between this a

Re: RDD resiliency -- does it keep state?

2015-03-28 Thread Aaron Davidson
Note that speculation is off by default to avoid these kinds of unexpected issues. On Sat, Mar 28, 2015 at 6:21 AM, Steve Loughran wrote: > > It's worth adding that there's no guaranteed that re-evaluated work would > be on the same host as before, and in the case of node failure, it is not > gu

Re: Spark will process _temporary folder on S3 is very slow and always cause failure

2015-03-17 Thread Aaron Davidson
Actually, this is the more relevant JIRA (which is resolved): https://issues.apache.org/jira/browse/SPARK-3595 6352 is about saveAsParquetFile, which is not in use here. Here is a DirectOutputCommitter implementation: https://gist.github.com/aarondav/c513916e72101bbe14ec and it can be configured

Re: Having lots of FetchFailedException in join

2015-03-05 Thread Aaron Davidson
> > > > > > Jianshi > > > > > > > > On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai > wrote: > > Hi Jianshi, > > > > From my understanding, it may not be the problem of NIO or Netty, looking > at your stack trace, the OOM is occ

Re: Which OutputCommitter to use for S3?

2015-03-05 Thread Aaron Davidson
one runs into the same problem I had. > >> > >> By setting --hadoop-major-version=2 when using the ec2 scripts, > >> everything worked fine. > >> > >> Darin. > >> > >> > >> - Original Message - > >> From: Darin McBeath &

Re: Having lots of FetchFailedException in join

2015-03-03 Thread Aaron Davidson
ChannelHandlerContext.java:319) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) > at > io.netty.channel.nio.NioEventLoop.

Re: Having lots of FetchFailedException in join

2015-03-03 Thread Aaron Davidson
"Failed to connect" implies that the executor at that host died, please check its logs as well. On Tue, Mar 3, 2015 at 11:03 AM, Jianshi Huang wrote: > Sorry that I forgot the subject. > > And in the driver, I got many FetchFailedException. The error messages are > > 15/03/03 10:34:32 WARN TaskS

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Aaron Davidson
All stated symptoms are consistent with GC pressure (other nodes timeout trying to connect because of a long stop-the-world), quite possibly due to groupByKey. groupByKey is a very expensive operation as it may bring all the data for a particular partition into memory (in particular, it cannot spil

Re: Worker and Nodes

2015-02-21 Thread Aaron Davidson
Note that the parallelism (i.e., number of partitions) is just an upper bound on how much of the work can be done in parallel. If you have 200 partitions, then you can divide the work among between 1 and 200 cores and all resources will remain utilized. If you have more than 200 cores, though, then

Re: Which OutputCommitter to use for S3?

2015-02-21 Thread Aaron Davidson
Here is the class: https://gist.github.com/aarondav/c513916e72101bbe14ec You can use it by setting "mapred.output.committer.class" in the Hadoop configuration (or "spark.hadoop.mapred.output.committer.class" in the Spark configuration). Note that this only works for the old Hadoop APIs, I believe

Re: RangePartitioner in Spark 1.2.1

2015-02-17 Thread Aaron Davidson
RangePartitioner does not actually provide a guarantee that all partitions will be equal sized (that is hard), and instead uses sampling to approximate equal buckets. Thus, it is possible that a bucket is left empty. If you want the specified behavior, you should define your own partitioner. It wo

Re: Shuffle write increases in spark 1.2

2015-02-15 Thread Aaron Davidson
I think Xuefeng Wu's suggestion is likely correct. This different is more likely explained by the compression library changing versions than sort vs hash shuffle (which should not affect output size significantly). Others have reported that switching to lz4 fixed their issue. We should document th

Re: Shuffle read/write issue in spark 1.2

2015-02-06 Thread Aaron Davidson
Did the problem go away when you switched to lz4? There was a change from the default compression codec fro 1.0 to 1.1, where we went from LZF to Snappy. I don't think there was any such change from 1.1 to 1.2, though. On Fri, Feb 6, 2015 at 12:17 AM, Praveen Garg wrote: > We tried changing the

Re: ephemeral-hdfs vs persistent-hdfs - performance

2015-02-04 Thread Aaron Davidson
The latter would be faster. With S3, you want to maximize number of concurrent readers until you hit your network throughput limits. On Wed, Feb 4, 2015 at 6:20 AM, Peter Rudenko wrote: > Hi if i have a 10GB file on s3 and set 10 partitions, would it be > download whole file on master first and

Re: 2GB limit for partitions?

2015-02-03 Thread Aaron Davidson
To be clear, there is no distinction between partitions and blocks for RDD caching (each RDD partition corresponds to 1 cache block). The distinction is important for shuffling, where by definition N partitions are shuffled into M partitions, creating N*M intermediate blocks. Each of these blocks m

Re: Duplicate key when sorting BytesWritable with Kryo?

2015-01-30 Thread Aaron Davidson
Ah, this is in particular an issue due to sort-based shuffle (it was not the case for hash-based shuffle, which would immediately serialize each record rather than holding many in memory at once). The documentation should be updated. On Fri, Jan 30, 2015 at 11:27 AM, Sandy Ryza wrote: > Hi Andre

Re: performance of saveAsTextFile moving files from _temporary

2015-01-28 Thread Aaron Davidson
gt; code logs, but the job sits there as the moving of files happens. > > On Tue, Jan 27, 2015 at 7:24 PM, Aaron Davidson > wrote: > >> This renaming from _temporary to the final location is actually done by >> executors, in parallel, for saveAsTextFile. It should be perfor

Re: performance of saveAsTextFile moving files from _temporary

2015-01-27 Thread Aaron Davidson
This renaming from _temporary to the final location is actually done by executors, in parallel, for saveAsTextFile. It should be performed by each task individually before it returns. I have seen an issue similar to what you mention dealing with Hive code which did the renaming serially on the dri

Re: Lost task - connection closed

2015-01-26 Thread Aaron Davidson
It looks like something weird is going on with your object serialization, perhaps a funny form of self-reference which is not detected by ObjectOutputStream's typical loop avoidance. That, or you have some data structure like a linked list with a parent pointer and you have many thousand elements.

Re: Lost task - connection closed

2015-01-25 Thread Aaron Davidson
Please take a look at the executor logs (on both sides of the IOException) to see if there are other exceptions (e.g., OOM) which precede this one. Generally, the connections should not fail spontaneously. On Sun, Jan 25, 2015 at 10:35 PM, octavian.ganea wrote: > Hi, > > I am running a program t

Re: Spark 1.2 – How to change Default (Random) port ….

2015-01-25 Thread Aaron Davidson
This was a regression caused by Netty Block Transfer Service. The fix for this just barely missed the 1.2 release, and you can see the associated JIRA here: https://issues.apache.org/jira/browse/SPARK-4837 Current master has the fix, and the Spark 1.2.1 release will have it included. If you don't

Re: Spark 1.2 - com/google/common/base/Preconditions java.lang.NoClassDefFoundErro

2015-01-20 Thread Aaron Davidson
Spark's network-common package depends on guava as a "provided" dependency in order to avoid conflicting with other libraries (e.g., Hadoop) that depend on specific versions. com/google/common/base/Preconditions has been present in Guava since version 2, so this is likely a "dependency not found" r

Re: Serializability: for vs. while loops

2015-01-15 Thread Aaron Davidson
Scala for-loops are implemented as closures using anonymous inner classes which are instantiated once and invoked many times. This means, though, that the code inside the loop is actually sitting inside a class, which confuses Spark's Closure Cleaner, whose job is to remove unused references from c

Re: use netty shuffle for network cause high gc time

2015-01-13 Thread Aaron Davidson
What version are you running? I think "spark.shuffle.use.netty" was a valid option only in Spark 1.1, where the Netty stuff was strictly experimental. Spark 1.2 contains an officially supported and much more thoroughly tested version under the property "spark.shuffle.blockTransferService", which is

Re: FileNotFoundException in appcache shuffle files

2015-01-10 Thread Aaron Davidson
As Jerry said, this is not related to shuffle file consolidation. The unique thing about this problem is that it's failing to find a file while trying to _write_ to it, in append mode. The simplest explanation for this would be that the file is deleted in between some check for existence and openi

Re: ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId

2015-01-08 Thread Aaron Davidson
Do note that this problem may be fixed in Spark 1.2, as we changed the default transfer service to use a Netty-based one rather than the ConnectionManager. On Thu, Jan 8, 2015 at 7:05 AM, Spidy wrote: > Hi, > > Can you please explain which settings did you changed? > > > > -- > View this message

Re: RDDs being cleaned too fast

2014-12-10 Thread Aaron Davidson
The ContextCleaner uncaches RDDs that have gone out of scope on the driver. So it's possible that the given RDD is no longer reachable in your program's control flow, or else it'd be a bug in the ContextCleaner. On Wed, Dec 10, 2014 at 5:34 PM, ankits wrote: > I'm using spark 1.1.0 and am seeing

Re: Running two different Spark jobs vs multi-threading RDDs

2014-12-06 Thread Aaron Davidson
You can actually submit multiple jobs to a single SparkContext in different threads. In the case you mentioned with 2 stages having a common parent, both will wait for the parent stage to complete and then the two will execute in parallel, sharing the cluster resources. Solutions that submit multi

Re: Announcing Spark 1.1.1!

2014-12-03 Thread Aaron Davidson
Because this was a maintenance release, we should not have introduced any binary backwards or forwards incompatibilities. Therefore, applications that were written and compiled against 1.1.0 should still work against a 1.1.1 cluster, and vice versa. On Wed, Dec 3, 2014 at 1:30 PM, Andrew Or wrote

Re: S3NativeFileSystem inefficient implementation when calling sc.textFile

2014-11-30 Thread Aaron Davidson
new s3a filesystem in Hadoop 2.6.0 [1]. > > 1. > https://issues.apache.org/jira/plugins/servlet/mobile#issue/HADOOP-10400 > On Nov 26, 2014 12:24 PM, "Aaron Davidson" wrote: > >> Spark has a known problem where it will do a pass of metadata on a large >> num

Re: S3NativeFileSystem inefficient implementation when calling sc.textFile

2014-11-26 Thread Aaron Davidson
Spark has a known problem where it will do a pass of metadata on a large number of small files serially, in order to find the partition information prior to starting the job. This will probably not be repaired by switching the FS impl. However, you can change the FS being used like so (prior to th

Re: Bug in Accumulators...

2014-11-23 Thread Aaron Davidson
As Mohit said, making Main extend Serializable should fix this example. In general, it's not a bad idea to mark the fields you don't want to serialize (e.g., sc and conf in this case) as @transient as well, though this is not the issue in this case. Note that this problem would not have arisen in

Re: Given multiple .filter()'s, is there a way to set the order?

2014-11-14 Thread Aaron Davidson
In the situation you show, Spark will pipeline each filter together, and will apply each filter one at a time to each row, effectively constructing an "&&" statement. You would only see a performance difference if the filter code itself is somewhat expensive, then you would want to only execute it

Re: data locality, task distribution

2014-11-13 Thread Aaron Davidson
@oculusinfo.com> wrote: >> >>> Sorry, I think I was not clear in what I meant. >>> I didn't mean it went down within a run, with the same instance. >>> >>> I meant I'd run the whole app, and one time, it would cache 100%, and >>> the next run,

Re: data locality, task distribution

2014-11-12 Thread Aaron Davidson
n what I meant. > I didn't mean it went down within a run, with the same instance. > > I meant I'd run the whole app, and one time, it would cache 100%, and the > next run, it might cache only 83% > > Within a run, it doesn't change. > > On Wed, Nov 12,

Re: data locality, task distribution

2014-11-12 Thread Aaron Davidson
The fact that the caching percentage went down is highly suspicious. It should generally not decrease unless other cached data took its place, or if unless executors were dying. Do you know if either of these were the case? On Tue, Nov 11, 2014 at 8:58 AM, Nathan Kronenfeld < nkronenf...@oculusinf

Re: Does spark works on multicore systems?

2014-11-08 Thread Aaron Davidson
oops, meant to cc userlist too On Sat, Nov 8, 2014 at 3:13 PM, Aaron Davidson wrote: > The default local master is "local[*]", which should use all cores on your > system. So you should be able to just do "./bin/pyspark" and > "sc.parallelize(range(1000)).co

Re: Bug in Accumulators...

2014-11-07 Thread Aaron Davidson
This may be due in part to Scala allocating an anonymous inner class in order to execute the for loop. I would expect if you change it to a while loop like var i = 0 while (i < 10) { sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) i += 1 } then the problem may go away. I am not sup

Re: Spark speed performance

2014-11-01 Thread Aaron Davidson
coalesce() is a streaming operation if used without the second parameter, it does not put all the data in RAM. If used with the second parameter (shuffle = true), then it performs a shuffle, but still does not put all the data in RAM. On Sat, Nov 1, 2014 at 12:09 PM, wrote: > Now I am getting to

Re: Getting spark to use more than 4 cores on Amazon EC2

2014-10-22 Thread Aaron Davidson
Another wild guess, if your data is stored in S3, you might be running into an issue where the default jets3t properties limits the number of parallel S3 connections to 4. Consider increasing the max-thread-counts from here: http://www.jets3t.org/toolkit/configuration.html. On Tue, Oct 21, 2014 at

Re: Shuffle issues in the current master

2014-10-22 Thread Aaron Davidson
You may be running into this issue: https://issues.apache.org/jira/browse/SPARK-4019 You could check by having 2000 or fewer reduce partitions. On Wed, Oct 22, 2014 at 1:48 PM, DB Tsai wrote: > PS, sorry for spamming the mailing list. Based my knowledge, both > spark.shuffle.spill.compress and

Re: input split size

2014-10-18 Thread Aaron Davidson
The "minPartitions" argument of textFile/hadoopFile cannot decrease the number of splits past the physical number of blocks/files. So if you have 3 HDFS blocks, asking for 2 minPartitions will still give you 3 partitions (hence the "min"). It can, however, convert a file with fewer HDFS blocks into

Re: Getting the type of an RDD in spark AND pyspark

2014-09-06 Thread Aaron Davidson
Pretty easy to do in Scala: rdd.elementClassTag.runtimeClass You can access this method from Python as well by using the internal _jrdd. It would look something like this (warning, I have not tested it): rdd._jrdd.classTag().runtimeClass() (The method name is "classTag" for JavaRDDLike, and "ele

Re: question on replicate() in blockManager.scala

2014-09-06 Thread Aaron Davidson
Looks like that's BlockManagerWorker.syncPutBlock(), which is in an if check, perhaps obscuring its existence. On Fri, Sep 5, 2014 at 2:19 AM, rapelly kartheek wrote: > Hi, > > var cachedPeers: Seq[BlockManagerId] = null > private def replicate(blockId: String, data: ByteBuffer, level: > Stor

Re: error: type mismatch while Union

2014-09-06 Thread Aaron Davidson
Are you doing this from the spark-shell? You're probably running into https://issues.apache.org/jira/browse/SPARK-1199 which should be fixed in 1.1. On Sat, Sep 6, 2014 at 3:03 AM, Dhimant wrote: > I am using Spark version 1.0.2 > > > > > -- > View this message in context: > http://apache-spark

Re: How to change the values in Array of Bytes

2014-09-06 Thread Aaron Davidson
More of a Scala question than Spark, but "apply" here can be written with just parentheses like this: val array = Array.fill[Byte](10)(0) if (array(index) == 0) { array(index) = 1 } The second instance of "array(index) = 1" is actually not calling apply, but "update". It's a scala-ism that's us

Re: Stage failure in BlockManager due to FileNotFoundException on long-running streaming job

2014-08-20 Thread Aaron Davidson
This is likely due to a bug in shuffle file consolidation (which you have enabled) which was hopefully fixed in 1.1 with this patch: https://github.com/apache/spark/commit/78f2af582286b81e6dc9fa9d455ed2b369d933bd Until 1.0.3 or 1.1 are released, the simplest solution is to disable spark.shuffle.co

Re: Spark: why need a masterLock when sending heartbeat to master

2014-08-17 Thread Aaron Davidson
Yes, good point, I believe the masterLock is now unnecessary altogether. The reason for its initial existence was that "changeMaster()" originally could be called out-of-band of the actor, and so we needed to make sure the master reference did not change out from under us. Now it appears that all m

Re: s3:// sequence file startup time

2014-08-17 Thread Aaron Davidson
The driver must initially compute the partitions and their preferred locations for each part of the file, which results in a serial getFileBlockLocations() on each part. However, I would expect this to take several seconds, not minutes, to perform on 1000 parts. Is your driver inside or outside of

Re: Iterator over RDD in PySpark

2014-08-01 Thread Aaron Davidson
apply at > Iterator.scala:371, took 0.02064317 s > bytearray(b'\x80\x02K\x01.') > > I understand that returned byte array somehow corresponds to actual data, > but how can I get it? > > > > On Fri, Aug 1, 2014 at 8:49 PM, Aaron Davidson wrote: > >> rdd.

Re: Iterator over RDD in PySpark

2014-08-01 Thread Aaron Davidson
rdd.toLocalIterator will do almost what you want, but requires that each individual partition fits in memory (rather than each individual line). Hopefully that's sufficient, though. On Fri, Aug 1, 2014 at 1:38 AM, Andrei wrote: > Is there a way to get iterator from RDD? Something like rdd.colle

Re: spark.shuffle.consolidateFiles seems not working

2014-07-31 Thread Aaron Davidson
Make sure to set it before you start your SparkContext -- it cannot be changed afterwards. Be warned that there are some known issues with shuffle file consolidation, which should be fixed in 1.1. On Thu, Jul 31, 2014 at 12:40 PM, Jianshi Huang wrote: > I got the number from the Hadoop admin. I

Re: "Spilling in-memory..." messages in log even with MEMORY_ONLY

2014-07-27 Thread Aaron Davidson
I see. There should not be a significant algorithmic difference between those two cases, as far as I can think, but there is a good bit of "local-mode-only" logic in Spark. One typical problem we see on large-heap, many-core JVMs, though, is much more time spent in garbage collection. I'm not sure

Re: "Spilling in-memory..." messages in log even with MEMORY_ONLY

2014-07-27 Thread Aaron Davidson
What are you comparing in your last experiment? Time spent in writeObject0 on a 100-core machine (!) vs a cluster? On Sat, Jul 26, 2014 at 11:59 PM, lokesh.gidra wrote: > Thanks a lot for clarifying this. This explains why there is less > serialization happening with lesser parallelism. There w

Re: Configuring Spark Memory

2014-07-24 Thread Aaron Davidson
ne mode? This is after having closely >>>>> read the documentation several times: >>>>> >>>>> *http://spark.apache.org/docs/latest/configuration.html >>>>> <http://spark.apache.org/docs/latest/configuration.html>* >>>>&g

Re: What if there are large, read-only variables shared by all map functions?

2014-07-23 Thread Aaron Davidson
In particular, take a look at the TorrentBroadcast, which should be much more efficient than HttpBroadcast (which was the default in 1.0) for large files. If you find that TorrentBroadcast doesn't work for you, then another way to solve this problem is to place the data on all nodes' local disks,

Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed

2014-07-21 Thread Aaron Davidson
What's the exception you're seeing? Is it an OOM? On Mon, Jul 21, 2014 at 11:20 AM, chutium wrote: > Hi, > > unfortunately it is not so straightforward > > xxx_parquet.db > > is a folder of managed database created by hive/impala, so, every sub > element in it is a table in hive/impala, they ar

Re: which kind of BlockId should I use?

2014-07-20 Thread Aaron Davidson
Hm, this is not a public API, but you should theoretically be able to use TestBlockId if you like. Internally, we just use the BlockId's natural hashing and equality to do lookups and puts, so it should work fine. However, since it is in no way public API, it may change even in maintenance releases

Re: Large Task Size?

2014-07-15 Thread Aaron Davidson
a high > overhead for running union? Could that create larger task sizes? > > Kyle > > > > On Sat, Jul 12, 2014 at 7:50 PM, Aaron Davidson > wrote: > >> I also did a quick glance through the code and couldn't find anything >> worrying that should be includ

Re: Confused by groupByKey() and the default partitioner

2014-07-13 Thread Aaron Davidson
3, 2014 at 9:13 AM, Guanhua Yan wrote: > Thanks, Aaron. I replaced groupByKey with reduceByKey along with some list > concatenation operations, and found that the performance becomes even > worse. So groupByKey is not that bad in my code. > > Best regards, > - Guanhua > >

Re: Large Task Size?

2014-07-12 Thread Aaron Davidson
I also did a quick glance through the code and couldn't find anything worrying that should be included in the task closures. The only possibly unsanitary part is the Updater you pass in -- what is your Updater and is it possible it's dragging in a significant amount of extra state? On Sat, Jul 12

Re: Putting block rdd failed when running example svm on large data

2014-07-12 Thread Aaron Davidson
Also check the web ui for that. Each iteration will have one or more stages associated with it in the driver web ui. On Sat, Jul 12, 2014 at 6:47 PM, crater wrote: > Hi Xiangrui, > > Thanks for the information. Also, it is possible to figure out the > execution > time per iteration for SVM? > >

Re: FW: memory question

2014-07-12 Thread Aaron Davidson
Spark 1.0.0 introduced the ContextCleaner to replace the MetadataCleaner API for this exact issue. The ContextClenaer automatically cleans up your RDD metadata once the RDD gets garbage collected on the driver. On Wed, Jul 9, 2014 at 3:31 AM, wrote: > Hi, > > > > Does anyone know if it is possi

Re: pyspark sc.parallelize running OOM with smallish data

2014-07-12 Thread Aaron Davidson
I think this is probably dying on the driver itself, as you are probably materializing the whole dataset inside your python driver. How large is spark_data_array compared to your driver memory? On Fri, Jul 11, 2014 at 7:30 PM, Mohit Jaggi wrote: > I put the same dataset into scala (using spark-

Re: Convert from RDD[Object] to RDD[Array[Object]]

2014-07-12 Thread Aaron Davidson
If you don't really care about the batchedDegree, but rather just want to do operations over some set of elements rather than one at a time, then just use mapPartitions(). Otherwise, if you really do want certain sized batches and you are able to relax the constraints slightly, is to construct the

Re: KMeans for large training data

2014-07-12 Thread Aaron Davidson
The "netlib.BLAS: Failed to load implementation" warning only means that the BLAS implementation may be slower than using a native one. The reason why it only shows up at the end is that the library is only used for the finalization step of the KMeans algorithm, so your job should've been wrapping

Re: Spark Questions

2014-07-12 Thread Aaron Davidson
I am not entirely certain I understand your questions, but let me assume you are mostly interested in SparkSQL and are thinking about your problem in terms of SQL-like tables. 1. Shuo Xiang mentioned Spark partitioning strategies, but in case you are talking about data partitioning or sharding as

Re: Confused by groupByKey() and the default partitioner

2014-07-12 Thread Aaron Davidson
Yes, groupByKey() does partition by the hash of the key unless you specify a custom Partitioner. (1) If you were to use groupByKey() when the data was already partitioned correctly, the data would indeed not be shuffled. Here is the associated code, you'll see that it simply checks that the Partit

Re: CoarseGrainedExecutorBackend: Driver Disassociated

2014-07-08 Thread Aaron Davidson
By the way, you can run the sc.getConf.get("spark.driver.host") thing inside spark-shell, whether or not the Executors actually start up successfully. On Tue, Jul 8, 2014 at 8:23 PM, Aaron Davidson wrote: > You actually should avoid setting SPARK_PUBLIC_DNS unless necessary, I

Re: CoarseGrainedExecutorBackend: Driver Disassociated

2014-07-08 Thread Aaron Davidson
You actually should avoid setting SPARK_PUBLIC_DNS unless necessary, I thought you might have preemptively done so. I think the issue is actually related to your network configuration, as Spark probably failed to find your driver's ip address. Do you see a warning on the driver that looks something

Re: Comparative study

2014-07-08 Thread Aaron Davidson
> > Not sure exactly what is happening but perhaps there are ways to > restructure your program for it to work better. Spark is definitely able to > handle much, much larger workloads. +1 @Reynold Spark can handle big "big data". There are known issues with informing the user about what went wro

Re: CoarseGrainedExecutorBackend: Driver Disassociated

2014-07-08 Thread Aaron Davidson
Hmm, looks like the Executor is trying to connect to the driver on localhost, from this line: 14/07/08 11:07:13 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@localhost:39701/user/CoarseGrainedScheduler What is your setup? Standalone mode with 4 separate machines? Are yo

Re: java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit exceeded)

2014-07-08 Thread Aaron Davidson
., a > call to System.gc())." > > It could be that there are many tasks running in the same node and they > all compete for running GCs which slow things down and trigger the error > you saw. By reducing the number of cores, there are more cpu resources > available to a task

Re: java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit exceeded)

2014-07-08 Thread Aaron Davidson
There is a difference from actual GC overhead, which can be reduced by reusing objects, versus this error, which actually means you ran out of memory. This error can probably be relieved by increasing your executor heap size, unless your data is corrupt and it is allocating huge arrays, or you are

Re: org.jboss.netty.channel.ChannelException: Failed to bind to: master/1xx.xx..xx:0

2014-07-01 Thread Aaron Davidson
In your spark-env.sh, do you happen to set SPARK_PUBLIC_DNS or something of that kin? This error suggests the worker is trying to bind a server on the master's IP, which clearly doesn't make sense On Mon, Jun 30, 2014 at 11:59 PM, MEETHU MATHEW wrote: > Hi, > > I did netstat -na | grep 192.168

Re: Failed to launch Worker

2014-07-01 Thread Aaron Davidson
Where are you running the spark-class version? Hopefully also on the workers. If you're trying to centrally start/stop all workers, you can add a "slaves" file to the spark conf/ directory which is just a list of your hosts, one per line. Then you can just use "./sbin/start-slaves.sh" to start the

Re: Serialization of objects

2014-07-01 Thread Aaron Davidson
If you want to stick with Java serialization and need to serialize a non-Serializable object, your best choices are probably to either subclass it with a Serializable one or wrap it in a class of your own which implements its own writeObject/readObject methods (see here: http://stackoverflow.com/qu

Re: Interconnect benchmarking

2014-06-27 Thread Aaron Davidson
A simple throughput test is also repartition()ing a large RDD. This also stresses the disks, though, so you might try to mount your spark temporary directory as a ramfs. On Fri, Jun 27, 2014 at 5:57 PM, danilopds wrote: > Hi, > According with the research paper bellow of Mathei Zaharia, Spark's

Re: Improving Spark multithreaded performance?

2014-06-26 Thread Aaron Davidson
I don't have specific solutions for you, but the general things to try are: - Decrease task size by broadcasting any non-trivial objects. - Increase duration of tasks by making them less fine-grained. How many tasks are you sending? I've seen in the past something like 25 seconds for ~10k total m

Re: Changing log level of spark

2014-06-25 Thread Aaron Davidson
If you're using the spark-ec2 scripts, you may have to change /root/ephemeral-hdfs/conf/log4j.properties or something like that, as that is added to the classpath before Spark's own conf. On Wed, Jun 25, 2014 at 6:10 PM, Tobias Pfeiffer wrote: > I have a log4j.xml in src/main/resources with > >

Re: jsonFile function in SQLContext does not work

2014-06-25 Thread Aaron Davidson
Is it possible you have blank lines in your input? Not that this should be an error condition, but it may be what's causing it. On Wed, Jun 25, 2014 at 11:57 AM, durin wrote: > Hi Zongheng Yang, > > thanks for your response. Reading your answer, I did some more tests and > realized that analyzi

Re: DAGScheduler: Failed to run foreach

2014-06-24 Thread Aaron Davidson
That IntRef problem is very strange, as it's not related to running a spark job, but rather just interpreting the code in the repl. There are two possibilities I can think of: - Spark was compiled with a different version of Scala than you're running it on. Spark is compiled on Scala 2.10 from Spar

Re: DAGScheduler: Failed to run foreach

2014-06-23 Thread Aaron Davidson
Please note that this: for (sentence <- sourcerdd) { ... } is actually Scala syntactic sugar which is converted into sourcerdd.foreach { sentence => ... } What this means is that this will actually run on the cluster, which is probably not what you want if you're trying to print them. Try t

Re: Shark vs Impala

2014-06-23 Thread Aaron Davidson
Note that regarding a "long load time", data format means a whole lot in terms of query performance. If you load all your data into compressed, columnar Parquet files on local hardware, Spark SQL would also perform far, far better than it would reading from gzipped S3 files. You must also be carefu

Re: spark with docker: errors with akka, NAT?

2014-06-17 Thread Aaron Davidson
Yup, alright, same solution then :) On Tue, Jun 17, 2014 at 7:39 PM, Mohit Jaggi wrote: > I used --privileged to start the container and then unmounted /etc/hosts. > Then I created a new /etc/hosts file > > > On Tue, Jun 17, 2014 at 4:58 PM, Aaron Davidson > wrote: > &g

Re: Executors not utilized properly.

2014-06-17 Thread Aaron Davidson
repartition() is actually just an alias of coalesce(), but which the shuffle flag to set to true. This shuffle is probably what you're seeing as taking longer, but it is required when you go from a smaller number of partitions to a larger. When actually decreasing the number of partitions, coalesc

Re: spark with docker: errors with akka, NAT?

2014-06-17 Thread Aaron Davidson
I remember having to do a similar thing in the spark docker scripts for testing purposes. Were you able to modify the /etc/hosts directly? I remember issues with that as docker apparently mounts it as part of its read-only filesystem. On Tue, Jun 17, 2014 at 4:36 PM, Mohit Jaggi wrote: > It was

Re: long GC pause during file.cache()

2014-06-15 Thread Aaron Davidson
Note also that Java does not work well with very large JVMs due to this exact issue. There are two commonly used workarounds: 1) Spawn multiple (smaller) executors on the same machine. This can be done by creating multiple Workers (via SPARK_WORKER_INSTANCES in standalone mode[1]). 2) Use Tachyon

Re: How to specify executor memory in EC2 ?

2014-06-12 Thread Aaron Davidson
The scripts for Spark 1.0 actually specify this property in /root/spark/conf/spark-defaults.conf I didn't know that this would override the --executor-memory flag, though, that's pretty odd. On Thu, Jun 12, 2014 at 6:02 PM, Aliaksei Litouka < aliaksei.lito...@gmail.com> wrote: > Yes, I am launc

Re: History Server renered page not suitable for load balancing

2014-06-11 Thread Aaron Davidson
A pull request would be great! On Wed, Jun 11, 2014 at 7:53 PM, elyast wrote: > Hi, > > Small issue but still. > > I run history server through Marathon and balance it through haproxy. The > problem is that links generated by HistoryPage (links to completed > applications) are absolute, e.g. ht

Re: How to enable fault-tolerance?

2014-06-09 Thread Aaron Davidson
Looks like your problem is local mode: https://github.com/apache/spark/blob/640f9a0efefd42cff86aecd4878a3a57f5ae85fa/core/src/main/scala/org/apache/spark/SparkContext.scala#L1430 For some reason, someone decided not to do retries when running in local mode. Not exactly sure why, feel free to submi

Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-09 Thread Aaron Davidson
It is not a very good idea to save the results in the exact same place as the data. Any failures during the job could lead to corrupted data, because recomputing the lost partitions would involve reading the original (now-nonexistent) data. As such, the only "safe" way to do this would be to do as

Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Aaron Davidson
s may be left over from previous >saves, which is dangerous. > > Is this correct? > > > On Mon, Jun 2, 2014 at 4:17 PM, Aaron Davidson wrote: > >> +1 please re-add this feature >> >> >> On Mon, Jun 2, 2014 at 12:44 PM, Patrick Wendell >> wrote:

Re: hadoopRDD stalls reading entire directory

2014-06-02 Thread Aaron Davidson
ib/scala-compiler.jar:/opt/cloudera/parcels/SPARK/lib/spark/lib/jline.jar > > -Dspark.akka.logLifecycleEvents=true > > > -Djava.library.path=/opt/cloudera/parcels/SPARK/lib/spark/lib:/opt/cloudera/parcels/CDH/lib/hadoop/lib/native > > -Xms512m -Xmx512m org.apache.spark.deplo

Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Aaron Davidson
+1 please re-add this feature On Mon, Jun 2, 2014 at 12:44 PM, Patrick Wendell wrote: > Thanks for pointing that out. I've assigned you to SPARK-1677 (I think > I accidentally assigned myself way back when I created it). This > should be an easy fix. > > On Mon, Jun 2, 2014 at 12:19 PM, Nan Zhu

  1   2   >