Re: Spark can't pickle class: error cannot lookup attribute
Thanks Davies and Eric. I followed Davies' instructions and it works wonderful. I would add that you can also add these scripts in the pyspark shell too: pyspark --py-files support.py where support.py is your script containing your class as Davies described. Best, Guillaume Guy * +1 919 - 972 - 8750* On Wed, Feb 18, 2015 at 11:48 PM, Davies Liu dav...@databricks.com wrote: Currently, PySpark can not support pickle a class object in current script ( '__main__'), the workaround could be put the implementation of the class into a separate module, then use bin/spark-submit --py-files xxx.py in deploy it. in xxx.py: class test(object): def __init__(self, a, b): self.total = a + b in job.py: from xxx import test a = sc.parallelize([(True,False),(False,False)]) a.map(lambda (x,y): test(x,y)) run it by: bin/spark-submit --py-files xxx.py job.py On Wed, Feb 18, 2015 at 1:48 PM, Guillaume Guy guillaume.c@gmail.com wrote: Hi, This is a duplicate of the stack-overflow question here. I hope to generate more interest on this mailing list. The problem: I am running into some attribute lookup problems when trying to initiate a class within my RDD. My workflow is quite standard: 1- Start with an RDD 2- Take each element of the RDD, initiate an object for each 3- Reduce (I will write a method that will define the reduce operation later on) Here is #2: class test(object): def __init__(self, a,b): self.total = a + b a = sc.parallelize([(True,False),(False,False)]) a.map(lambda (x,y): test(x,y)) Here is the error I get: PicklingError: Can't pickle class 'main.test' : attribute lookup main.test failed I'd like to know if there is any way around it. Please, answer with a working example to achieve the intended results (i.e. creating a RDD of objects of class tests). Thanks in advance! Related question: https://groups.google.com/forum/#!topic/edx-code/9xzRJFyQwn GG
Re: SchemaRDD.select
The trick here is getting the scala compiler to do the implicit conversion from Symbol - Column. In your second example, the compiler doesn't know that you are going to try and use the Seq[Symbol] as a Seq[Column] and so doesn't do the conversion. The following are other ways to provide enough info to make sure it happens: // Explicitly create columns val columnList = Seq($field1, $field2) // Explicitly type the Seq val columnList: Seq[Column] = Seq('field1, 'field2) On Thu, Feb 19, 2015 at 12:09 PM, Cesar Flores ces...@gmail.com wrote: Well: I think that I solved my issue in the next way: val variable_fieldsStr = List(field1,field2) val variable_argument_list= variable_fieldsStr.map(f = Alias(Symbol(f), f)()) val schm2 = myschemaRDD.select(variable_argument_list:_*) schm2 seems to have the required fields, but would like to hear the opinion of an expert about it. Thanks On Thu, Feb 19, 2015 at 12:01 PM, Cesar Flores ces...@gmail.com wrote: I am trying to pass a variable number of arguments to the select function of a SchemaRDD I created, as I want to select the fields in run time: val variable_argument_list = List('field1,'field2') val schm1 = myschemaRDD.select('field1,'field2) // works val schm2 = myschemaRDD.select(variable_argument_list:_*) // do not work I am interested in selecting in run time the fields from myschemaRDD variable. However, the usual way of passing variable number of arguments as a List in Scala fails. Is there a way of selecting a variable number of arguments in the select function? If not, what will be a better approach for selecting the required fields in run time? Thanks in advance for your help -- Cesar Flores -- Cesar Flores
Streaming Linear Regression
Hi I tried to run Streaming Linear Regression in my local. val trainingData = ssc.textFileStream(/home/barisakgu/Desktop/Spark/train).map(LabeledPoint.parse) textFileStream is not seeing the new files. I search on the Internet, and I saw that somebody has same issue but no solution is found for that. Is there any opinion for this ? Is there any body who can achieve the running streaming linear regression ? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Linear-Regression-tp21726.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark job fails on cluster but works fine on a single machine
Yeah, I do manually delete the files, but it still fails with this error. On Feb 19, 2015, at 8:16 PM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: When writing to hdfs Spark will not overwrite existing files or directories. You must either manually delete these or use Java's Hadoop FileSystem class to remove them. Sent with Good (www.good.com) -Original Message- From: Pavel Velikhov [pavel.velik...@gmail.com mailto:pavel.velik...@gmail.com] Sent: Thursday, February 19, 2015 11:32 AM Eastern Standard Time To: user@spark.apache.org Subject: Spark job fails on cluster but works fine on a single machine I have a simple Spark job that goes out to Cassandra, runs a pipe and stores results: val sc = new SparkContext(conf) val rdd = sc.cassandraTable(“keyspace, “table) .map(r = r.getInt(“column) + \t + write(get_lemmas(r.getString(tags .pipe(python3 /tmp/scripts_and_models/scripts/run.py) .map(r = convertStr(r) ) .coalesce(1,true) .saveAsTextFile(/tmp/pavel/CassandraPipeTest.txt) //.saveToCassandra(“keyspace, “table, SomeColumns(“id”,data”)) When run on a single machine, everything is fine if I save to an hdfs file or save to Cassandra. When run in cluster neither works: - When saving to file, I get an exception: User class threw exception: Output directory hdfs://hadoop01:54310/tmp/pavel/CassandraPipeTest.txt hdfs://hadoop01:54310/tmp/pavel/CassandraPipeTest.txt already exists - When saving to Cassandra, only 4 rows are updated with empty data (I test on a 4-machine Spark cluster) Any hints on how to debug this and where the problem could be? - I delete the hdfs file before running - Would really like the output to hdfs to work, so I can debug - Then it would be nice to save to Cassandra The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Spark streaming program with Tranquility/Druid
Hello all, I am trying to integrate Spark with Tranquility (https://github.com/metamx/tranquility) and when I start the Spark program, I get the error below: java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass at org.codehaus.jackson.map.introspect.JacksonAnnotationIntrospector.findDeserializationType(JacksonAnnotationIntrospector.java:524) at org.codehaus.jackson.map.deser.BasicDeserializerFactory.modifyTypeByAnnotation(BasicDeserializerFactory.java:732) at org.codehaus.jackson.map.deser.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:427) at org.codehaus.jackson.map.deser.StdDeserializerProvider._createDeserializer(StdDeserializerProvider.java:398) at org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCache2(StdDeserializerProvider.java:307) at org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCacheValueDeserializer(StdDeserializerProvider.java:287) . The Tranquility library uses Jackson 2.4.4 (as does Druid) while I think something in Spark is using jackson-core-asl which is the deprecated version of the Jackson library. I am using sbt assembly to create an assembly file to use via spark-submit and I tried to force it to add the older version containing the package referenced in the error above. I see that the assembly jar has that package now but it is still being ignored by Spark. Any idea how to go about finding a solution? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-program-with-Tranquility-Druid-tp21725.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unknown sample in Naive Baye's
If you know there are data doesn't belong to any existing category, put them into the training set and make a new category for them. It won't help much if instances from this unknown category are all outliers. In that case, lower the thresholds and tune the parameters to get a lower error rate. -Xiangrui On Thu, Feb 19, 2015 at 8:58 AM, Jatinpreet Singh jatinpr...@gmail.com wrote: Hi Xiangrui, Thanks for the answer. The problem is that in my application, I can not stop user from scoring any type of sample against trained model. So, even if the class of a completely unknown sample has not been trained, the model will put it in one of the categories with high priority. I wish to eliminate this with come kind of probability threshold. Is this possible in any way with Naive Baye's? Can changing the classification algorithm help in this regard? I appreciate any help on this. Thanks, Jatin On Wed, Feb 18, 2015 at 3:07 AM, Xiangrui Meng men...@gmail.com wrote: If there exists a sample that doesn't not belong to A/B/C, it means that there exists another class D or Unknown besides A/B/C. You should have some of these samples in the training set in order to let naive Bayes learn the priors. -Xiangrui On Tue, Feb 10, 2015 at 10:44 PM, jatinpreet jatinpr...@gmail.com wrote: Hi, I am using MLlib's Naive Baye's classifier to classify textual data. I am accessing the posterior probabilities through a hack for each class. Once I have trained the model, I want to remove documents whose confidence of classification is low. Say for a document, if the highest class probability is lesser than a pre-defined threshold(separate for each class), categorize this document as 'unknown'. Say there are three classes A, B and C with thresholds 0.35, 0.32 and 0.33 respectively defined after training and testing. If I score a sample that belongs to neither of the three categories, I wish to classify it as 'unknown'. But the issue is I can get a probability higher than these thresholds for a document that doesn't belong to the trained categories. Is there any technique which I can apply to segregate documents that belong to untrained classes with certain degree of confidence? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unknown-sample-in-Naive-Baye-s-tp21594.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards, Jatinpreet Singh - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: stack map functions in a loop (pyspark)
On Thu, Feb 19, 2015 at 7:57 AM, jamborta jambo...@gmail.com wrote: Hi all, I think I have run into an issue on the lazy evaluation of variables in pyspark, I have to following functions = [func1, func2, func3] for counter in range(len(functions)): data = data.map(lambda value: [functions[counter](value)]) You need to create a wrapper for counter: def mapper(f): return lambda v: [f(v)] for f in functions: data = data.map(mapper(f)) it looks like that the counter is evaluated when the RDD is computed, so it fills in all the three mappers with the last value of it. Is there any way to get it forced to be evaluated at the time? (I am aware that I could run persist it after each step, which sounds a bit of a waste) thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/stack-map-functions-in-a-loop-pyspark-tp21722.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SchemaRDD.select
Well: I think that I solved my issue in the next way: val variable_fieldsStr = List(field1,field2) val variable_argument_list= variable_fieldsStr.map(f = Alias(Symbol(f), f)()) val schm2 = myschemaRDD.select(variable_argument_list:_*) schm2 seems to have the required fields, but would like to hear the opinion of an expert about it. Thanks On Thu, Feb 19, 2015 at 12:01 PM, Cesar Flores ces...@gmail.com wrote: I am trying to pass a variable number of arguments to the select function of a SchemaRDD I created, as I want to select the fields in run time: val variable_argument_list = List('field1,'field2') val schm1 = myschemaRDD.select('field1,'field2) // works val schm2 = myschemaRDD.select(variable_argument_list:_*) // do not work I am interested in selecting in run time the fields from myschemaRDD variable. However, the usual way of passing variable number of arguments as a List in Scala fails. Is there a way of selecting a variable number of arguments in the select function? If not, what will be a better approach for selecting the required fields in run time? Thanks in advance for your help -- Cesar Flores -- Cesar Flores
spark-sql problem with textfile separator
Hello everybody, I'm quite new to Spark and Scala as well and I was trying to analyze some csv data via spark-sql My csv file contains data like this Following the example at this link below https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection I have that code: When I try to fetch val name_address the second field (address) does not contain all the text. The issue is probably the comma, it is at the same time the split character and a string value of the third column (address) of the csv file. How can handle this? Thank you in advance Sparkino -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-problem-with-textfile-separator-tp21718.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unzipping large files and 2GB partition size.
Thanks for your reply Sean. Looks like it's happening in a map: 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing tasks from Stage 1 (MappedRDD[17] at mapToPair at NativeMethodAccessorImpl.java:-2) That's my initial 'parse' stage, done before repartitioning. It reduces the data size significantly so I thought it would be sensible to do before repartitioning, which involves moving lots of data around. That might be a stupid idea in hindsight! So the obvious thing to try would be to try repartitioning before the map as the first transformation. I would have done that if I could be sure that it would succeed or fail quickly. I'm not entirely clear about the lazy execution of transformations in DAG. It could be that the error is manifesting during the mapToPair, but caused by the earlier read from text file stage. Thanks for pointers to those compression formats. I'll give them a go (although it's not trivial to re-encode 200 GB of data on S3, so if I can get this working reasonably with gzip I'd like to). Any advice about whether this error can be worked round with an early partition? Cheers Joe On 19 February 2015 at 09:51, Sean Owen so...@cloudera.com wrote: gzip and zip are not splittable compression formats; bzip and lzo are. Ideally, use a splittable compression format. Repartitioning is not a great solution since it means a shuffle, typically. This is not necessarily related to how big your partitions are. The question is, when does this happen? what operation? On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass jw...@crossref.org wrote: On the advice of some recent discussions on this list, I thought I would try and consume gz files directly. I'm reading them, doing a preliminary map, then repartitioning, then doing normal spark things. As I understand it, zip files aren't readable in partitions because of the format, so I thought that repartitioning would be the next best thing for parallelism. I have about 200 files, some about 1GB compressed and some over 2GB uncompressed. I'm hitting the 2GB maximum partition size. It's been discussed on this list (topic: 2GB limit for partitions?, tickets SPARK-1476 and SPARK-1391). Stack trace at the end. This happened at 10 hours in (probably when it saw its first file). I can't just re-run it quickly! Does anyone have any advice? Might I solve this by re-partitioning as the first step after reading the file(s)? Or is it effectively impossible to read a gz file that expands to over 2GB? Does anyone have any experience with this? Thanks in advance Joe Stack trace: Exception in thread main 15/02/18 20:44:25 INFO scheduler.TaskSetManager: Lost task 5.3 in stage 1.0 (TID 283) on executor: java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE) [duplicate 6] org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage 1.0: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:618) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
In a Spark Streaming application, what might be the potential causes for util.AkkaUtils: Error sending message in 1 attempts and java.util.concurrent.TimeoutException: Futures timed out and
Hello, We have a Spark Streaming application that watches an input directory, and as files are copied there the application reads them and sends the contents to a RESTful web service, receives a response and write some contents to an output directory. When testing the application by copying a few thousand files at once to its input directory, we have realized that after having processed about 3800 files, it creates messages as the following in the log file: 15/02/19 10:22:06 INFO storage.MemoryStore: Block broadcast_17935 of size 9960 dropped from memory (free 447798720) 15/02/19 10:22:55 WARN util.AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398) and then the Spark Streaming application dies. What might be the potential causes to check for such errors? Below you can see last few lines before it dies: 15/02/19 10:22:03 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 12894 15/02/19 10:22:04 INFO storage.MemoryStore: ensureFreeSpace(20978) called with curMem=107884847, maxMem=556038881 15/02/19 10:22:04 INFO storage.MemoryStore: Block broadcast_12894_piece0 stored as bytes in memory (estimated size 20.5 KB, free 427.4 MB) 15/02/19 10:22:04 INFO storage.BlockManagerMaster: Updated info of block broadcast_12894_piece0 15/02/19 10:22:04 INFO broadcast.TorrentBroadcast: Reading broadcast variable 12894 took 460 ms 15/02/19 10:22:04 INFO storage.MemoryStore: ensureFreeSpace(347363) called with curMem=107905825, maxMem=556038881 15/02/19 10:22:04 INFO storage.MemoryStore: Block broadcast_12894 stored as values in memory (estimated size 339.2 KB, free 427.0 MB) 15/02/19 10:22:04 INFO storage.MemoryStore: ensureFreeSpace(1079) called with curMem=108253188, maxMem=556038881 15/02/19 10:22:04 INFO storage.MemoryStore: Block rdd_30466_35 stored as bytes in memory (estimated size 1079.0 B, free 427.0 MB) 15/02/19 10:22:04 INFO storage.BlockManagerMaster: Updated info of block rdd_30466_35 15/02/19 10:22:05 INFO storage.MemoryStore: ensureFreeSpace(5) called with curMem=108254267, maxMem=556038881 15/02/19 10:22:05 INFO storage.MemoryStore: Block rdd_30467_35 stored as bytes in memory (estimated size 5.0 B, free 427.0 MB) 15/02/19 10:22:05 INFO storage.BlockManagerMaster: Updated info of block rdd_30467_35 15/02/19 10:22:05 INFO executor.Executor: Finished task 35.0 in stage 351.0 (TID 12229). 2353 bytes result sent to driver 15/02/19 10:22:06 INFO storage.BlockManager: Removing broadcast 17935 15/02/19 10:22:06 INFO storage.BlockManager: Removing block broadcast_17935_piece0 15/02/19 10:22:06 INFO storage.MemoryStore: Block broadcast_17935_piece0 of size 4151 dropped from memory (free 447788760) 15/02/19 10:22:06 INFO storage.BlockManagerMaster: Updated info of block broadcast_17935_piece0 15/02/19 10:22:06 INFO storage.BlockManager: Removing block broadcast_17935 15/02/19 10:22:06 INFO storage.MemoryStore: Block broadcast_17935 of size 9960 dropped from memory (free 447798720) 15/02/19 10:22:55 WARN util.AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398) 15/02/19 10:23:28 WARN util.AkkaUtils: Error sending message in 2 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398) 15/02/19 10:24:01 WARN util.AkkaUtils: Error sending message in 3 attempts
Unzipping large files and 2GB partition size.
On the advice of some recent discussions on this list, I thought I would try and consume gz files directly. I'm reading them, doing a preliminary map, then repartitioning, then doing normal spark things. As I understand it, zip files aren't readable in partitions because of the format, so I thought that repartitioning would be the next best thing for parallelism. I have about 200 files, some about 1GB compressed and some over 2GB uncompressed. I'm hitting the 2GB maximum partition size. It's been discussed on this list (topic: 2GB limit for partitions?, tickets SPARK-1476 and SPARK-1391). Stack trace at the end. This happened at 10 hours in (probably when it saw its first file). I can't just re-run it quickly! Does anyone have any advice? Might I solve this by re-partitioning as the first step after reading the file(s)? Or is it effectively impossible to read a gz file that expands to over 2GB? Does anyone have any experience with this? Thanks in advance Joe Stack trace: Exception in thread main 15/02/18 20:44:25 INFO scheduler.TaskSetManager: Lost task 5.3 in stage 1.0 (TID 283) on executor: java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE) [duplicate 6] org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage 1.0: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:618) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
Re: loads of memory still GC overhead limit exceeded
This should result in 4 executors, not 25. They should be able to execute 4*4 = 16 tasks simultaneously. You have them grab 4*32 = 128GB of RAM, not 1TB. It still feels like this shouldn't be running out of memory, not by a long shot though. But just pointing out potential differences between what you are expecting and what you are configuring. On Thu, Feb 19, 2015 at 9:56 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: Hi, I have 4 very powerful boxes (256GB RAM, 32 cores each). I am running spark 1.2.0 in yarn-client mode with following layout: spark.executor.cores=4 spark.executor.memory=28G spark.yarn.executor.memoryOverhead=4096 I am submitting bigger ALS trainImplicit task (rank=100, iters=15) on a dataset with ~3 billion of ratings using 25 executors. At some point some executor crashes with: 15/02/19 05:41:06 WARN util.AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398) 15/02/19 05:41:06 ERROR executor.Executor: Exception in task 131.0 in stage 51.0 (TID 7259) java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.reflect.Array.newInstance(Array.java:75) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1671) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) So the GC overhead limit exceeded is pretty clear and would suggest running out of memory. Since I have 1TB of RAM available this must be rather due to some config inoptimality. Can anyone please point me to some directions how to tackle this? Thanks, Antony. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Tableau beta connector
Hi, I would like you to read my stack overflow answer to this question. If you need more clarification feel free to drop a msg. http://stackoverflow.com/questions/28403664/connect-to-existing-hive-in-intellij-using-sbt-as-build Regards, Ashutosh From: ganterm [via Apache Spark User List] ml-node+s1001560n21709...@n3.nabble.com Sent: Thursday, February 19, 2015 12:49 AM To: Ashutosh Trivedi (MT2013030) Subject: Re: Tableau beta connector Ashutosh, Were you able to figure this out? I am having the exact some question. I think the answer is to use Spark SQL to create/load a table in Hive (e.g. execute the HiveQL CREATE TABLE statement) but I am not sure. Hoping for something more simple than that. Anybody? Thanks! If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Tableau-beta-connector-tp21512p21709.html To unsubscribe from Tableau beta connector, click herehttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=21512code=YXNodXRvc2gudHJpdmVkaUBpaWl0Yi5vcmd8MjE1MTJ8LTM5MzMxOTc2MQ==. NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml
Re: In a Spark Streaming application, what might be the potential causes for util.AkkaUtils: Error sending message in 1 attempts and java.util.concurrent.TimeoutException: Futures timed out and
What version of Spark are you using? TD On Thu, Feb 19, 2015 at 2:45 AM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, We have a Spark Streaming application that watches an input directory, and as files are copied there the application reads them and sends the contents to a RESTful web service, receives a response and write some contents to an output directory. When testing the application by copying a few thousand files at once to its input directory, we have realized that after having processed about 3800 files, it creates messages as the following in the log file: 15/02/19 10:22:06 INFO storage.MemoryStore: Block broadcast_17935 of size 9960 dropped from memory (free 447798720) 15/02/19 10:22:55 WARN util.AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398) and then the Spark Streaming application dies. What might be the potential causes to check for such errors? Below you can see last few lines before it dies: 15/02/19 10:22:03 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 12894 15/02/19 10:22:04 INFO storage.MemoryStore: ensureFreeSpace(20978) called with curMem=107884847, maxMem=556038881 15/02/19 10:22:04 INFO storage.MemoryStore: Block broadcast_12894_piece0 stored as bytes in memory (estimated size 20.5 KB, free 427.4 MB) 15/02/19 10:22:04 INFO storage.BlockManagerMaster: Updated info of block broadcast_12894_piece0 15/02/19 10:22:04 INFO broadcast.TorrentBroadcast: Reading broadcast variable 12894 took 460 ms 15/02/19 10:22:04 INFO storage.MemoryStore: ensureFreeSpace(347363) called with curMem=107905825, maxMem=556038881 15/02/19 10:22:04 INFO storage.MemoryStore: Block broadcast_12894 stored as values in memory (estimated size 339.2 KB, free 427.0 MB) 15/02/19 10:22:04 INFO storage.MemoryStore: ensureFreeSpace(1079) called with curMem=108253188, maxMem=556038881 15/02/19 10:22:04 INFO storage.MemoryStore: Block rdd_30466_35 stored as bytes in memory (estimated size 1079.0 B, free 427.0 MB) 15/02/19 10:22:04 INFO storage.BlockManagerMaster: Updated info of block rdd_30466_35 15/02/19 10:22:05 INFO storage.MemoryStore: ensureFreeSpace(5) called with curMem=108254267, maxMem=556038881 15/02/19 10:22:05 INFO storage.MemoryStore: Block rdd_30467_35 stored as bytes in memory (estimated size 5.0 B, free 427.0 MB) 15/02/19 10:22:05 INFO storage.BlockManagerMaster: Updated info of block rdd_30467_35 15/02/19 10:22:05 INFO executor.Executor: Finished task 35.0 in stage 351.0 (TID 12229). 2353 bytes result sent to driver 15/02/19 10:22:06 INFO storage.BlockManager: Removing broadcast 17935 15/02/19 10:22:06 INFO storage.BlockManager: Removing block broadcast_17935_piece0 15/02/19 10:22:06 INFO storage.MemoryStore: Block broadcast_17935_piece0 of size 4151 dropped from memory (free 447788760) 15/02/19 10:22:06 INFO storage.BlockManagerMaster: Updated info of block broadcast_17935_piece0 15/02/19 10:22:06 INFO storage.BlockManager: Removing block broadcast_17935 15/02/19 10:22:06 INFO storage.MemoryStore: Block broadcast_17935 of size 9960 dropped from memory (free 447798720) 15/02/19 10:22:55 WARN util.AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398) 15/02/19 10:23:28 WARN util.AkkaUtils: Error sending message in 2 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at
storing MatrixFactorizationModel (pyspark)
Hi, when getting the model out of ALS.train it would be beneficial to store it (to disk) so the model can be reused later for any following predictions. I am using pyspark and I had no luck pickling it either using standard pickle module or even dill. does anyone have a solution for this (note it is pyspark)? thank you,Antony.
Regarding minimum number of partitions while reading data from Hadoop
Hi, In our job, we need to process the data in small chunks, so as to avoid GC and other stuff. For this, we are using old API of hadoop as that let us specify parameter like minPartitions. Does any one knows, If there a way to do the same via newHadoopAPI also? How that way will be different from older API? I am little bit aware of split size stuff, but not much aware regarding any promise that minimum number of partitions criteria gets satisfied or not. Any pointers will be of help. Thanks, Twinkle
Re: Unzipping large files and 2GB partition size.
gzip and zip are not splittable compression formats; bzip and lzo are. Ideally, use a splittable compression format. Repartitioning is not a great solution since it means a shuffle, typically. This is not necessarily related to how big your partitions are. The question is, when does this happen? what operation? On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass jw...@crossref.org wrote: On the advice of some recent discussions on this list, I thought I would try and consume gz files directly. I'm reading them, doing a preliminary map, then repartitioning, then doing normal spark things. As I understand it, zip files aren't readable in partitions because of the format, so I thought that repartitioning would be the next best thing for parallelism. I have about 200 files, some about 1GB compressed and some over 2GB uncompressed. I'm hitting the 2GB maximum partition size. It's been discussed on this list (topic: 2GB limit for partitions?, tickets SPARK-1476 and SPARK-1391). Stack trace at the end. This happened at 10 hours in (probably when it saw its first file). I can't just re-run it quickly! Does anyone have any advice? Might I solve this by re-partitioning as the first step after reading the file(s)? Or is it effectively impossible to read a gz file that expands to over 2GB? Does anyone have any experience with this? Thanks in advance Joe Stack trace: Exception in thread main 15/02/18 20:44:25 INFO scheduler.TaskSetManager: Lost task 5.3 in stage 1.0 (TID 283) on executor: java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE) [duplicate 6] org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage 1.0: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:618) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
loads of memory still GC overhead limit exceeded
Hi, I have 4 very powerful boxes (256GB RAM, 32 cores each). I am running spark 1.2.0 in yarn-client mode with following layout: spark.executor.cores=4 spark.executor.memory=28G spark.yarn.executor.memoryOverhead=4096 I am submitting bigger ALS trainImplicit task (rank=100, iters=15) on a dataset with ~3 billion of ratings using 25 executors. At some point some executor crashes with: 15/02/19 05:41:06 WARN util.AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)15/02/19 05:41:06 ERROR executor.Executor: Exception in task 131.0 in stage 51.0 (TID 7259)java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.reflect.Array.newInstance(Array.java:75) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1671) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) So the GC overhead limit exceeded is pretty clear and would suggest running out of memory. Since I have 1TB of RAM available this must be rather due to some config inoptimality. Can anyone please point me to some directions how to tackle this? Thanks,Antony.
Re: loads of memory still GC overhead limit exceeded
based on spark UI I am running 25 executors for sure. why would you expect four? I submit the task with --num-executors 25 and I get 6-7 executors running per host (using more of smaller executors allows me better cluster utilization when running parallel spark sessions (which is not the case of this reported issue - for now using the cluster exclusively)). thx,Antony. On Thursday, 19 February 2015, 11:02, Sean Owen so...@cloudera.com wrote: This should result in 4 executors, not 25. They should be able to execute 4*4 = 16 tasks simultaneously. You have them grab 4*32 = 128GB of RAM, not 1TB. It still feels like this shouldn't be running out of memory, not by a long shot though. But just pointing out potential differences between what you are expecting and what you are configuring. On Thu, Feb 19, 2015 at 9:56 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: Hi, I have 4 very powerful boxes (256GB RAM, 32 cores each). I am running spark 1.2.0 in yarn-client mode with following layout: spark.executor.cores=4 spark.executor.memory=28G spark.yarn.executor.memoryOverhead=4096 I am submitting bigger ALS trainImplicit task (rank=100, iters=15) on a dataset with ~3 billion of ratings using 25 executors. At some point some executor crashes with: 15/02/19 05:41:06 WARN util.AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398) 15/02/19 05:41:06 ERROR executor.Executor: Exception in task 131.0 in stage 51.0 (TID 7259) java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.reflect.Array.newInstance(Array.java:75) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1671) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) So the GC overhead limit exceeded is pretty clear and would suggest running out of memory. Since I have 1TB of RAM available this must be rather due to some config inoptimality. Can anyone please point me to some directions how to tackle this? Thanks, Antony.
Re: loads of memory still GC overhead limit exceeded
Oh OK you are saying you are requesting 25 executors and getting them, got it. You can consider making fewer, bigger executors to pool rather than split up your memory, but at some point it becomes counter-productive. 32GB is a fine executor size. So you have ~8GB available per task which seems like plenty. Something else is at work here. Is this error form your code's stages or ALS? On Thu, Feb 19, 2015 at 10:07 AM, Antony Mayi antonym...@yahoo.com wrote: based on spark UI I am running 25 executors for sure. why would you expect four? I submit the task with --num-executors 25 and I get 6-7 executors running per host (using more of smaller executors allows me better cluster utilization when running parallel spark sessions (which is not the case of this reported issue - for now using the cluster exclusively)). thx, Antony. On Thursday, 19 February 2015, 11:02, Sean Owen so...@cloudera.com wrote: This should result in 4 executors, not 25. They should be able to execute 4*4 = 16 tasks simultaneously. You have them grab 4*32 = 128GB of RAM, not 1TB. It still feels like this shouldn't be running out of memory, not by a long shot though. But just pointing out potential differences between what you are expecting and what you are configuring. On Thu, Feb 19, 2015 at 9:56 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: Hi, I have 4 very powerful boxes (256GB RAM, 32 cores each). I am running spark 1.2.0 in yarn-client mode with following layout: spark.executor.cores=4 spark.executor.memory=28G spark.yarn.executor.memoryOverhead=4096 I am submitting bigger ALS trainImplicit task (rank=100, iters=15) on a dataset with ~3 billion of ratings using 25 executors. At some point some executor crashes with: 15/02/19 05:41:06 WARN util.AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398) 15/02/19 05:41:06 ERROR executor.Executor: Exception in task 131.0 in stage 51.0 (TID 7259) java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.reflect.Array.newInstance(Array.java:75) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1671) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) So the GC overhead limit exceeded is pretty clear and would suggest running out of memory. Since I have 1TB of RAM available this must be rather due to some config inoptimality. Can anyone please point me to some directions how to tackle this? Thanks, Antony. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: loads of memory still GC overhead limit exceeded
it is from within the ALS.trainImplicit() call. btw. the exception varies between this GC overhead limit exceeded and Java heap space (which I guess is just different outcome of same problem). just tried another run and here are the logs (filtered) - note I tried this run with spark.shuffle.io.preferDirectBufs=false so this might be slightly different issue from my previous case (going to revert now): === spark stdout ===15/02/19 10:15:05 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(6, 192.168.1.92, 54289) with no recent heart beats: 50221ms exceeds 45000ms15/02/19 10:16:05 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(13, 192.168.1.90, 56768) with no recent heart beats: 54749ms exceeds 45000ms15/02/19 10:16:44 ERROR cluster.YarnClientClusterScheduler: Lost executor 6 on 192.168.1.92: remote Akka client disassociated15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 57.0 in stage 18.0 (TID 5379, 192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 32.0 in stage 18.0 (TID 5354, 192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 82.0 in stage 18.0 (TID 5404, 192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 7.0 in stage 18.0 (TID 5329, 192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 ERROR cluster.YarnClientSchedulerBackend: Asked to remove non-existent executor 615/02/19 10:16:54 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 18.0 (TID 5328, 192.168.1.90): FetchFailed(BlockManagerId(6, 192.168.1.92, 54289), shuffleId=6, mapId=227, reduceId=6, message=org.apache.spark.shuffle.FetchFailedException: Failed to connect to /192.168.1.92:54289 at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) === yarn log ===15/02/19 10:15:05 WARN executor.Executor: Told to re-register on heartbeat15/02/19 10:16:02 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM15/02/19 10:16:02 WARN server.TransportChannelHandler: Exception in connection from /192.168.1.92:45633io.netty.handler.codec.DecoderException: java.lang.OutOfMemoryError: Java heap space at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:280) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:149) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) === yarn nodemanager log ===2015-02-19 10:16:45,146 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 20284 for container-id container_1424204221358_0012_01_16: 28.5 GB of 32 GB physical memory used; 29.1 GB of 67.2 GB virtual memory used2015-02-19 10:16:45,163 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 20273 for container-id container_1424204221358_0012_01_20: 28.5 GB of 32 GB physical memory used; 29.2 GB of 67.2 GB virtual memory used2015-02-19 10:16:46,621 WARN org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code from container container_1424204221358_0012_01_08 is : 1432015-02-19 10:16:46,621 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: Container container_1424204221358_0012_01_08 transitioned from RUNNING to EXITED_WITH_FAILURE2015-02-19 10:16:46,621 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch: Cleaning up container container_1424204221358_0012_01_08 thanks for any help,Antony. ps. could that be Java 8 related? On Thursday, 19 February 2015, 11:25, Sean Owen so...@cloudera.com wrote: Oh OK you are saying you are requesting 25 executors and getting them, got it. You can consider making fewer, bigger executors to pool rather than split up your memory, but at some point it becomes counter-productive. 32GB is a fine executor size. So you have ~8GB available per task which seems like plenty. Something else is at work here. Is this error form your code's stages or ALS? On Thu, Feb 19, 2015 at 10:07 AM, Antony Mayi antonym...@yahoo.com wrote: based on spark UI I am running 25 executors for sure. why would you expect four? I submit the task with --num-executors 25 and I get 6-7 executors running per host (using
Re: In a Spark Streaming application, what might be the potential causes for util.AkkaUtils: Error sending message in 1 attempts and java.util.concurrent.TimeoutException: Futures timed out and
On Thu, Feb 19, 2015 at 12:27 PM, Tathagata Das t...@databricks.com wrote: What version of Spark are you using? TD Spark version is 1.2.0 (running on Cloudera CDH 5.3.0) -- Emre Sevinç
Re: Regarding minimum number of partitions while reading data from Hadoop
I think that the newer Hadoop API does not expose this suggested min partitions parameter like the old one did. I believe you can try setting mapreduce.input.fileinputformat.split.{min,max}size instead on the Hadoop Configuration to suggest a max/min split size, and therefore bound the number of partitions you get back. On Thu, Feb 19, 2015 at 11:07 AM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi, In our job, we need to process the data in small chunks, so as to avoid GC and other stuff. For this, we are using old API of hadoop as that let us specify parameter like minPartitions. Does any one knows, If there a way to do the same via newHadoopAPI also? How that way will be different from older API? I am little bit aware of split size stuff, but not much aware regarding any promise that minimum number of partitions criteria gets satisfied or not. Any pointers will be of help. Thanks, Twinkle - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark-sql problem with textfile separator
This is because of each line will be separated into 4 columns instead of 3 columns. If you want to use comma to separate different columns, each column will be not allowed to include commas. 2015-02-19 18:12 GMT+08:00 sparkino francescoboname...@gmail.com: Hello everybody, I'm quite new to Spark and Scala as well and I was trying to analyze some csv data via spark-sql My csv file contains data like this Following the example at this link below https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection I have that code: When I try to fetch val name_address the second field (address) does not contain all the text. The issue is probably the comma, it is at the same time the split character and a string value of the third column (address) of the csv file. How can handle this? Thank you in advance Sparkino -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-problem-with-textfile-separator-tp21718.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark on Mesos: Multiple Users with iPython Notebooks
I am running Spark on Mesos and it works quite well. I have three users, all who setup iPython notebooks to instantiate a spark instance to work with on the notebooks. I love it so far. Since I am auto instantiating (I don't want a user to have to think about instantiating and submitting a spark app to do adhoc analysis, I want the environment setup ahead of time) this is done whenever an iPython notebook is open. So far it's working pretty good, save one issue: Every notebook is a new driver. I.e. every time they open a notebook, a new spark submit is called, and the driver resources are allocated, regardless if they are used or not. Yes, it's only the driver, but even that I find starts slowing down my queries for the notebooks that using spark. (I am running in Mesos Fined Grained mode). I have three users on my system, ideally, I would love to find a way so that on the first notebook being opened, a driver is started for that user, and then can be used for any notebook the user has open. So if they open a new notebook, I can check that yes, the user has a spark driver running, and thus, that notebook, if there is a query, will run it through that driver. That allows me to understand the resource allocation better, and it limits users from running 10 notebooks and having a lot of resources. The other thing I was wondering is could the driver actually be run on the mesos cluster? Right now, I have a edge node as an iPython server, the drivers all exist on that server, so as I get more and more drivers, the box's local resources get depleted with unused drivers. Obviously if I could reuse the drivers per user, on that box, that is great first step, but if I could reuse drivers, and run them on the cluster, that would be ideal. looking through the docs I was not clear on those options. If anyone could point me in the right direction, I would greatly appreciate it! John - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ML Transformer
Hi Cesar, these methods would be private until new ml api would stabilize (aprox. in spark 1.4). My solution for the same issue was to create org.apache.spark.ml package in my project and extends/implement everything there. Thanks, Peter Rudenko On 2015-02-18 22:17, Cesar Flores wrote: I am working right now with the ML pipeline, which I really like it. However in order to make a real use of it, I would like create my own transformers that implements org.apache.spark.ml.Transformer. In order to do that, a method from the PipelineStage needs to be implemented. But this method is private to the ml package: private[ml] deftransformSchema(schema: StructType, paramMap: ParamMap):StructType Do any user can create their own transformers? If not, do this functionality will be added in the future. Thanks -- Cesar Flores
Re: loads of memory still GC overhead limit exceeded
now with reverted spark.shuffle.io.preferDirectBufs (to true) getting again GC overhead limit exceeded: === spark stdout ===15/02/19 12:08:08 WARN scheduler.TaskSetManager: Lost task 7.0 in stage 18.0 (TID 5329, 192.168.1.93): java.lang.OutOfMemoryError: GC overhead limit exceeded at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) === yarn log (same) ===15/02/19 12:08:08 ERROR executor.Executor: Exception in task 7.0 in stage 18.0 (TID 5329)java.lang.OutOfMemoryError: GC overhead limit exceeded at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) === yarn nodemanager ===2015-02-19 12:08:13,758 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 19014 for container-id container_1424204221358_0013_01_12: 29.8 GB of 32 GB physical memory used; 31.7 GB of 67.2 GB virtual memory used2015-02-19 12:08:13,778 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 19013 for container-id container_1424204221358_0013_01_08: 1.2 MB of 32 GB physical memory used; 103.6 MB of 67.2 GB virtual memory used2015-02-19 12:08:14,455 WARN org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code from container container_1424204221358_0013_01_08 is : 1432015-02-19 12:08:14,455 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: Container container_1424204221358_0013_01_08 transitioned from RUNNING to EXITED_WITH_FAILURE2015-02-19 12:08:14,455 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch: Cleaning up container container_1424204221358_0013_01_08 Antony. On Thursday, 19 February 2015, 11:54, Antony Mayi antonym...@yahoo.com.INVALID wrote: it is from within the ALS.trainImplicit() call. btw. the exception varies between this GC overhead limit exceeded and Java heap space (which I guess is just different outcome of same problem). just tried another run and here are the logs (filtered) - note I tried this run with spark.shuffle.io.preferDirectBufs=false so this might be slightly different issue from my previous case (going to revert now): === spark stdout ===15/02/19 10:15:05 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(6, 192.168.1.92, 54289) with no recent heart beats: 50221ms exceeds 45000ms15/02/19 10:16:05 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(13, 192.168.1.90, 56768) with no recent heart beats: 54749ms exceeds 45000ms15/02/19 10:16:44 ERROR cluster.YarnClientClusterScheduler: Lost executor 6 on 192.168.1.92: remote Akka client disassociated15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 57.0 in stage 18.0 (TID 5379, 192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 32.0 in stage 18.0 (TID 5354, 192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 82.0 in stage 18.0 (TID 5404, 192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 7.0 in stage 18.0 (TID 5329, 192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 ERROR cluster.YarnClientSchedulerBackend: Asked to remove non-existent executor 615/02/19 10:16:54 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 18.0 (TID 5328, 192.168.1.90): FetchFailed(BlockManagerId(6, 192.168.1.92, 54289), shuffleId=6, mapId=227, reduceId=6, message=org.apache.spark.shuffle.FetchFailedException: Failed to connect to /192.168.1.92:54289 at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at
Re: spark-sql problem with textfile separator
Hi Yanbo, unfortunately all csv files contain comma inside some columns and I can't change the structure. How can I work with this kind of textfile and spark-sql? Thank you again 2015-02-19 14:38 GMT+01:00 Yanbo Liang hackingda...@gmail.com: This is because of each line will be separated into 4 columns instead of 3 columns. If you want to use comma to separate different columns, each column will be not allowed to include commas. 2015-02-19 18:12 GMT+08:00 sparkino francescoboname...@gmail.com: Hello everybody, I'm quite new to Spark and Scala as well and I was trying to analyze some csv data via spark-sql My csv file contains data like this Following the example at this link below https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection I have that code: When I try to fetch val name_address the second field (address) does not contain all the text. The issue is probably the comma, it is at the same time the split character and a string value of the third column (address) of the csv file. How can handle this? Thank you in advance Sparkino -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-problem-with-textfile-separator-tp21718.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: JdbcRDD, ClassCastException with scala.Function0
At the beginning of the code, do a query to find the current maximum ID Don't just put in an arbitrarily large value, or all of your rows will end up in 1 spark partition at the beginning of the range. The question of keys is up to you... all that you need to be able to do is write a sql statement that takes 2 numbers to specify the bounds. Of course, a numeric primary key is going to be the most efficient way to do that. On Thu, Feb 19, 2015 at 8:57 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Yup, I did see that. Good point though, Cody. The mismatch was happening for me when I was trying to get the 'new JdbcRDD' approach going. Once I switched to the 'create' method things are working just fine. Was just able to refactor the 'get connection' logic into a 'DbConnection implements JdbcRDD.ConnectionFactory' and my 'map row' class is still 'MapRow implements org.apache.spark.api.java.function.FunctionResultSet, Row'. This works fine and makes the driver program tighter. Of course, my next question is, how to work with the lower and upper bound parameters. As in, what if I don't know what the min and max ID values are and just want to extract all data from the table, what should the params be, if that's even supported. And furthermore, what if the primary key on the table is not numeric? or if there's no primary key altogether? The method works fine with lowerBound=0 and upperBound=100, for example. But doesn't seem to have a way to say, 'no upper bound' (-1 didn't work). On Wed, Feb 18, 2015 at 11:59 PM, Cody Koeninger c...@koeninger.org wrote: Look at the definition of JdbcRDD.create: def create[T]( sc: JavaSparkContext, connectionFactory: ConnectionFactory, sql: String, lowerBound: Long, upperBound: Long, numPartitions: Int, mapRow: JFunction[ResultSet, T]): JavaRDD[T] = { JFunction here is the interface org.apache.spark.api.java.function.Function, not scala Function0 LIkewise, ConnectionFactory is an interface defined inside JdbcRDD, not scala Function0 On Wed, Feb 18, 2015 at 4:50 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: That's exactly what I was doing. However, I ran into runtime issues with doing that. For instance, I had a public class DbConnection extends AbstractFunction0Connection implements Serializable I got a runtime error from Spark complaining that DbConnection wasn't an instance of scala.Function0. I also had a public class MapRow extends scala.runtime.AbstractFunction1java.sql.ResultSet, Row implements Serializable with which I seemed to have more luck. On Wed, Feb 18, 2015 at 5:32 PM, Cody Koeninger c...@koeninger.org wrote: Cant you implement the org.apache.spark.api.java.function.Function interface and pass an instance of that to JdbcRDD.create ? On Wed, Feb 18, 2015 at 3:48 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Cody, you were right, I had a copy and paste snag where I ended up with a vanilla SparkContext rather than a Java one. I also had to *not* use my function subclasses, rather just use anonymous inner classes for the Function stuff and that got things working. I'm fully following the JdbcRDD.create approach from JavaJdbcRDDSuite.java basically verbatim. Is there a clean way to refactor out the custom Function classes such as the one for getting a db connection or mapping ResultSet data to your own POJO's rather than doing it all inline? On Wed, Feb 18, 2015 at 1:52 PM, Cody Koeninger c...@koeninger.org wrote: Is sc there a SparkContext or a JavaSparkContext? The compilation error seems to indicate the former, but JdbcRDD.create expects the latter On Wed, Feb 18, 2015 at 12:30 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: I have tried that as well, I get a compile error -- [ERROR] ...SparkProto.java:[105,39] error: no suitable method found for create(SparkContext,anonymous ConnectionFactory,String,int,int,int,anonymous FunctionResultSet,Integer) The code is a copy and paste: JavaRDDInteger jdbcRDD = JdbcRDD.create( sc, new JdbcRDD.ConnectionFactory() { public Connection getConnection() throws SQLException { return DriverManager.getConnection(jdbc:derby:target/JavaJdbcRDDSuiteDb); } }, SELECT DATA FROM FOO WHERE ? = ID AND ID = ?, 1, 100, 1, new FunctionResultSet, Integer() { public Integer call(ResultSet r) throws Exception { return r.getInt(1); } } ); The other thing I've tried was to define a static class locally for GetConnection and use the JdbcCreate constructor. This got around the compile issues but blew up at runtime with NoClassDefFoundError: scala/runtime/AbstractFunction0 ! JdbcRDDRow jdbcRDD = new JdbcRDDRow( sc, (AbstractFunction0Connection) new DbConn(), // had to cast or a
Re: SparkSQL + Tableau Connector
Great, glad it worked out! From: Todd Nist Date: Thursday, February 19, 2015 at 9:19 AM To: Silvio Fiorito Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: SparkSQL + Tableau Connector Hi Silvio, I got this working today using your suggestion with the Initial SQL and a Custom Query. See here for details: http://stackoverflow.com/questions/28403664/connect-to-existing-hive-in-intellij-using-sbt-as-build/28608608#28608608 It is not ideal as I need to write a custom query, but does work for now. I also have it working by doing a SaveAsTable on the ingested data which stores the reference into the metastore for access via the thrift server. Thanks for the help. -Todd On Wed, Feb 11, 2015 at 8:41 PM, Silvio Fiorito silvio.fior...@granturing.commailto:silvio.fior...@granturing.com wrote: Hey Todd, I don’t have an app to test against the thrift server, are you able to define custom SQL without using Tableau’s schema query? I guess it’s not possible to just use SparkSQL temp tables, you may have to use permanent Hive tables that are actually in the metastore so Tableau can discover them in the schema. In that case you will either have to generate the Hive tables externally from Spark or use Spark to process the data and save them using a HiveContext. From: Todd Nist Date: Wednesday, February 11, 2015 at 7:53 PM To: Andrew Lee Cc: Arush Kharbanda, user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: SparkSQL + Tableau Connector First sorry for the long post. So back to tableau and Spark SQL, I'm still missing something. TL;DR To get the Spark SQL Temp table associated with the metastore are there additional steps required beyond doing the below? Initial SQL on connection: create temporary table test using org.apache.spark.sql.json options (path '/data/json/*'); cache table test; I feel like I'm missing a step of associating the Spark SQL table with the metastore, do I need to actually save it in some fashion? I'm trying to avoid saving to hive if possible. Details: I configured the hive-site.xml and placed it in the $SPARK_HOME/conf. It looks like this, thanks Andrew and Arush for the assistance: ?xml version=1.0? ?xml-stylesheet type=text/xsl href=configuration.xsl? configuration property namehive.semantic.analyzer.factory.impl/name valueorg.apache.hcatalog.cli.HCatSemanticAnalyzerFactory/value /property property namehive.metastore.sasl.enabled/name valuefalse/value /property property namehive.server2.authentication/name valueNONE/value /property property namehive.server2.enable.doAs/name valuetrue/value /property !-- property namehive.metastore.uris/name valuethrift://localhost:9083/value descriptionIP address (or fully-qualified domain name) and port of the metastore host/description /property -- property namehive.warehouse.subdir.inherit.perms/name valuetrue/value /property property namehive.metastore.schema.verification/name valuefalse/value /property property namejavax.jdo.option.ConnectionURL/name valuejdbc:mysql://localhost:3306/metastore_db?createDatabaseIfNotExist=true/value descriptionmetadata is stored in a MySQL server/description /property property namejavax.jdo.option.ConnectionDriverName/name valuecom.mysql.jdbc.Driver/value descriptionMySQL JDBC driver class/description /property property namejavax.jdo.option.ConnectionUserName/name valuehiveuser/value /property property namejavax.jdo.option.ConnectionPassword/name valuehiveuser/value /property /configuration When I start the server it looks fine: $ ./sbin/start-thriftserver.sh --hiveconf hive.server2.thrift.port=10001 --hiveconf hive.server2.thrift.bind.host radtech.iohttp://radtech.io --master spark://radtech.io:7077http://radtech.io:7077 --driver-class-path /usr/local/spark/lib/mysql-connector-java-5.1.34-bin.jar starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to /usr/local/spark-1.2.1-bin-hadoop2.4/logs/spark-tnist-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-radtech.io.out radtech:spark tnist$ tail -f logs/spark-tnist-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-radtech.io.out 15/02/11 19:15:24 INFO SparkDeploySchedulerBackend: Granted executor ID app-20150211191524-0008/1 on hostPort 192.168.1.2:50851http://192.168.1.2:50851 with 2 cores, 512.0 MB RAM 15/02/11 19:15:24 INFO AppClient$ClientActor: Executor updated: app-20150211191524-0008/0 is now LOADING 15/02/11 19:15:24 INFO AppClient$ClientActor: Executor updated: app-20150211191524-0008/1 is now LOADING 15/02/11 19:15:24 INFO AppClient$ClientActor: Executor updated: app-20150211191524-0008/0 is now RUNNING 15/02/11 19:15:24 INFO AppClient$ClientActor: Executor updated: app-20150211191524-0008/1 is now RUNNING 15/02/11 19:15:24 INFO NettyBlockTransferService: Server created on 50938 15/02/11
Re: storing MatrixFactorizationModel (pyspark)
Yep. the matrix model had two RDD vectors representing the decomposed matrix. You can save these to disk and re use them. On Thu, Feb 19, 2015 at 2:19 AM Antony Mayi antonym...@yahoo.com.invalid wrote: Hi, when getting the model out of ALS.train it would be beneficial to store it (to disk) so the model can be reused later for any following predictions. I am using pyspark and I had no luck pickling it either using standard pickle module or even dill. does anyone have a solution for this (note it is pyspark)? thank you, Antony.
Re: loads of memory still GC overhead limit exceeded
Hi Anthony - you are seeing a problem that I ran into. The underlying issue is your default parallelism setting. What's happening is that within ALS certain RDD operations end up changing the number of partitions you have of your data. For example if you start with an RDD of 300 partitions, unless default parallelism is set while the algorithm executes you'll eventually get an RDD with something like 20 partitions. Consequently, your giant data set is now stored across a much smaller number of partitions so each partition is huge. Then, when a shuffle requires serialization you run out of heap space trying to serialize it. The solution should be as simple as setting the default parallelism setting. This is referenced in a JIRA I can't find at the moment. On Thu, Feb 19, 2015 at 5:10 AM Antony Mayi antonym...@yahoo.com.invalid wrote: now with reverted spark.shuffle.io.preferDirectBufs (to true) getting again GC overhead limit exceeded: === spark stdout === 15/02/19 12:08:08 WARN scheduler.TaskSetManager: Lost task 7.0 in stage 18.0 (TID 5329, 192.168.1.93): java.lang.OutOfMemoryError: GC overhead limit exceeded at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) === yarn log (same) === 15/02/19 12:08:08 ERROR executor.Executor: Exception in task 7.0 in stage 18.0 (TID 5329) java.lang.OutOfMemoryError: GC overhead limit exceeded at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) === yarn nodemanager === 2015-02-19 12:08:13,758 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 19014 for container-id container_1424204221358_0013_01_12: 29.8 GB of 32 GB physical memory used; 31.7 GB of 67.2 GB virtual memory used 2015-02-19 12:08:13,778 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 19013 for container-id container_1424204221358_0013_01_08: 1.2 MB of 32 GB physical memory used; 103.6 MB of 67.2 GB virtual memory used 2015-02-19 12:08:14,455 WARN org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code from container container_1424204221358_0013_01_08 is : 143 2015-02-19 12:08:14,455 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: Container container_1424204221358_0013_01_08 transitioned from RUNNING to EXITED_WITH_FAILURE 2015-02-19 12:08:14,455 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch: Cleaning up container container_1424204221358_0013_01_08 Antony. On Thursday, 19 February 2015, 11:54, Antony Mayi antonym...@yahoo.com.INVALID wrote: it is from within the ALS.trainImplicit() call. btw. the exception varies between this GC overhead limit exceeded and Java heap space (which I guess is just different outcome of same problem). just tried another run and here are the logs (filtered) - note I tried this run with spark.shuffle.io.preferDirectBufs=false so this might be slightly different issue from my previous case (going to revert now): === spark stdout === 15/02/19 10:15:05 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(6, 192.168.1.92, 54289) with no recent heart beats: 50221ms exceeds 45000ms 15/02/19 10:16:05 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(13, 192.168.1.90, 56768) with no recent heart beats: 54749ms exceeds 45000ms 15/02/19 10:16:44 ERROR cluster.YarnClientClusterScheduler: Lost executor 6 on 192.168.1.92: remote Akka client disassociated 15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 57.0 in stage 18.0 (TID 5379, 192.168.1.92): ExecutorLostFailure (executor 6 lost) 15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 32.0 in stage 18.0 (TID 5354, 192.168.1.92): ExecutorLostFailure (executor 6 lost) 15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 82.0 in stage 18.0 (TID 5404, 192.168.1.92): ExecutorLostFailure (executor 6 lost) 15/02/19 10:16:44 WARN
Why is RDD lookup slow?
Hi, I am doing lookup on cached RDDs [(Int,String)], and I noticed that the lookup is relatively slow 30-100 ms ?? I even tried this on one machine with single partition, but no difference! The RDDs are not large at all, 3-30 MB. Is this expected behaviour? should I use other data structures, like HashMap to keep data and look up it there and use Broadcast to send a copy to all machines? best, /Shahab
Re: Why is RDD lookup slow?
RDDs are not Maps. lookup() does a linear scan -- parallel by partition, but stil linear. Yes, it is not supposed be an O(1) lookup data structure. It'd be much nicer to broadcast the relatively small data set as a Map and look it up fast, locally. On Thu, Feb 19, 2015 at 3:29 PM, shahab shahab.mok...@gmail.com wrote: Hi, I am doing lookup on cached RDDs [(Int,String)], and I noticed that the lookup is relatively slow 30-100 ms ?? I even tried this on one machine with single partition, but no difference! The RDDs are not large at all, 3-30 MB. Is this expected behaviour? should I use other data structures, like HashMap to keep data and look up it there and use Broadcast to send a copy to all machines? best, /Shahab - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why is RDD lookup slow?
Hi Shahab - if your data structures are small enough a broadcasted Map is going to provide faster lookup. Lookup within an RDD is an O(m) operation where m is the size of the partition. For RDDs with multiple partitions, executors can operate on it in parallel so you get some improvement for larger RDDs. On Thu, Feb 19, 2015 at 7:31 AM shahab shahab.mok...@gmail.com wrote: Hi, I am doing lookup on cached RDDs [(Int,String)], and I noticed that the lookup is relatively slow 30-100 ms ?? I even tried this on one machine with single partition, but no difference! The RDDs are not large at all, 3-30 MB. Is this expected behaviour? should I use other data structures, like HashMap to keep data and look up it there and use Broadcast to send a copy to all machines? best, /Shahab
Re: spark-sql problem with textfile separator
For your case, I think you can use a trick for separating with “ “,” instead of “,” You can refer the following code snippet val people = sc.textFile(examples/src/main/resources/data.csv).map( x = x.substring(1,x.length-1).split(\,\)).map(p = List(p(0), p(1), p(2))) On Feb 19, 2015, at 10:02 PM, Francesco Bonamente francescoboname...@gmail.com wrote: Hi Yanbo, unfortunately all csv files contain comma inside some columns and I can't change the structure. How can I work with this kind of textfile and spark-sql? Thank you again 2015-02-19 14:38 GMT+01:00 Yanbo Liang hackingda...@gmail.com: This is because of each line will be separated into 4 columns instead of 3 columns. If you want to use comma to separate different columns, each column will be not allowed to include commas. 2015-02-19 18:12 GMT+08:00 sparkino francescoboname...@gmail.com: Hello everybody, I'm quite new to Spark and Scala as well and I was trying to analyze some csv data via spark-sql My csv file contains data like this Following the example at this link below https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection I have that code: When I try to fetch val name_address the second field (address) does not contain all the text. The issue is probably the comma, it is at the same time the split character and a string value of the third column (address) of the csv file. How can handle this? Thank you in advance Sparkino -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-problem-with-textfile-separator-tp21718.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: bulk writing to HDFS in Spark Streaming?
There was already a thread around it if i understood your question correctly, you can go through this https://mail-archives.apache.org/mod_mbox/spark-user/201502.mbox/%3ccannjawtrp0nd3odz-5-_ya351rin81q-9+f2u-qn+vruqy+...@mail.gmail.com%3E Thanks Best Regards On Thu, Feb 19, 2015 at 8:16 PM, Chico Qi qijic...@gmail.com wrote: Hi all, In Spark Streaming I want use the Dstream.saveAsTextFiles by bulk writing because of the normal saveAsTextFiles cannot during the batch interval of setting. May be a common pool of writing or another assigned worker for bulk writing? Thanks! B/R Jichao
Re: No suitable driver found error, Create table in hive from spark sql
Hi Dhimant, I believe if you change your spark-shell to pass -driver-class-path /usr/local/spark/lib/mysql-connector-java-5.1.34-bin.jar vs putting it in --jars. -Todd On Wed, Feb 18, 2015 at 10:41 PM, Dhimant dhimant84.jays...@gmail.com wrote: Found solution from one of the post found on internet. I updated spark/bin/compute-classpath.sh and added database connector jar into classpath. CLASSPATH=$CLASSPATH:/data/mysql-connector-java-5.1.14-bin.jar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/No-suitable-driver-found-error-Create-table-in-hive-from-spark-sql-tp21714p21715.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Tableau beta connector
I am able to connect by doing the following using the Tableau Initial SQL and a custom query: 1. First ingest csv file or json and save out to file system: import org.apache.spark.sql.SQLContext import com.databricks.spark.csv._ val sqlContext = new SQLContext(sc) val demo = sqlContext.csvFile(/user/data/csv/demo.csv) demo.toJSON.saveAsTextFile(/user/data/json/test”) 2. Start $SPARK_HOME/sbin/start-thirftserver: ./sbin/start-thriftserver.sh --master spark://radtech.io:7077 --total-executor-cores 2 --driver-class-path --hiveconf hive.server2.thrift.port=10001 --hiveconf hive.server2.thrift.bind.host radtech.io 3. Start tableau session. Create a connection to thrift server via SparkSQL (Beta) connector. 4. In Tableau add the following to the “Initial SQL” create temporary table test using org.apache.spark.sql.json options (path '/user/data/json/test/*’); cache table test; 1. Refresh connection. Then select “New Custom SQL” and issue something like: select * from test; You will see your table appear. HTH. -Todd On Thu, Feb 19, 2015 at 5:41 AM, ashu ashutosh.triv...@iiitb.org wrote: Hi, I would like you to read my stack overflow answer to this question. If you need more clarification feel free to drop a msg. http://stackoverflow.com/questions/28403664/connect-to-existing-hive-in-intellij-using-sbt-as-build Regards, Ashutosh -- *From:* ganterm [via Apache Spark User List] ml-node+[hidden email] http:///user/SendEmail.jtp?type=nodenode=21719i=0 *Sent:* Thursday, February 19, 2015 12:49 AM *To:* Ashutosh Trivedi (MT2013030) *Subject:* Re: Tableau beta connector Ashutosh, Were you able to figure this out? I am having the exact some question. I think the answer is to use Spark SQL to create/load a table in Hive (e.g. execute the HiveQL CREATE TABLE statement) but I am not sure. Hoping for something more simple than that. Anybody? Thanks! -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Tableau-beta-connector-tp21512p21709.html To unsubscribe from Tableau beta connector, click here. NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: Re: Tableau beta connector http://apache-spark-user-list.1001560.n3.nabble.com/Tableau-beta-connector-tp21512p21719.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Filtering keys after map+combine
You have the keys before and after reduceByKey. You want to do something based on the key within reduceByKey? it just calls combineByKey, so you can use that method for lower-level control over the merging. Whether it's possible depends I suppose on what you mean to filter on. If it's just a property of the key, can't you just filter before reduceByKey? If it's a property of the key's value, don't you need to wait for the reduction to finish? or are you saying that you know a key should be filtered based on its value partway through the merge? I suppose you can use combineByKey to create a mergeValue function that changes an input type A into some other Option[B]; you output None if your criteria is reached, and your combine function returns None if either argument is None? it doesn't save 100% of the work but it may mean you only shuffle (key,None) for some keys if the map-side combine already worked out that the key would be filtered. And then after, run a flatMap or something to make Option[B] into B. On Thu, Feb 19, 2015 at 2:21 PM, Debasish Das debasish.da...@gmail.com wrote: Hi, Before I send out the keys for network shuffle, in reduceByKey after map + combine are done, I would like to filter the keys based on some threshold... Is there a way to get the key, value after map+combine stages so that I can run a filter on the keys ? Thanks. Deb - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: issue Running Spark Job on Yarn Cluster
Is this the full stack trace ? On Wed, Feb 18, 2015 at 2:39 AM, sachin Singh sachin.sha...@gmail.com wrote: Hi, I want to run my spark Job in Hadoop yarn Cluster mode, I am using below command - spark-submit --master yarn-cluster --driver-memory 1g --executor-memory 1g --executor-cores 1 --class com.dc.analysis.jobs.AggregationJob sparkanalitic.jar param1 param2 param3 I am getting error as under, kindly suggest whats going wrong ,is command is proper or not ,thanks in advance, Exception in thread main org.apache.spark.SparkException: Application finished with failed status at org.apache.spark.deploy.yarn.ClientBase$class.run(ClientBase.scala:509) at org.apache.spark.deploy.yarn.Client.run(Client.scala:35) at org.apache.spark.deploy.yarn.Client$.main(Client.scala:139) at org.apache.spark.deploy.yarn.Client.main(Client.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/issue-Running-Spark-Job-on-Yarn-Cluster-tp21697.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- *Harshvardhan Chauhan* | Software Engineer *GumGum* http://www.gumgum.com/ | *Ads that stick* 310-260-9666 | ha...@gumgum.com
Re: Filtering keys after map+combine
Hi Sean, This is what I intend to do: are you saying that you know a key should be filtered based on its value partway through the merge? I should use combineByKey... Thanks. Deb On Thu, Feb 19, 2015 at 6:31 AM, Sean Owen so...@cloudera.com wrote: You have the keys before and after reduceByKey. You want to do something based on the key within reduceByKey? it just calls combineByKey, so you can use that method for lower-level control over the merging. Whether it's possible depends I suppose on what you mean to filter on. If it's just a property of the key, can't you just filter before reduceByKey? If it's a property of the key's value, don't you need to wait for the reduction to finish? or are you saying that you know a key should be filtered based on its value partway through the merge? I suppose you can use combineByKey to create a mergeValue function that changes an input type A into some other Option[B]; you output None if your criteria is reached, and your combine function returns None if either argument is None? it doesn't save 100% of the work but it may mean you only shuffle (key,None) for some keys if the map-side combine already worked out that the key would be filtered. And then after, run a flatMap or something to make Option[B] into B. On Thu, Feb 19, 2015 at 2:21 PM, Debasish Das debasish.da...@gmail.com wrote: Hi, Before I send out the keys for network shuffle, in reduceByKey after map + combine are done, I would like to filter the keys based on some threshold... Is there a way to get the key, value after map+combine stages so that I can run a filter on the keys ? Thanks. Deb
Spark 1.2.1: ClassNotFoundException when running hello world example in scala 2.11
I'm having an issue with spark 1.2.1 and scala 2.11. I detailed the symptoms in this stackoverflow question. http://stackoverflow.com/questions/28612837/spark-classnotfoundexception-when-running-hello-world-example-in-scala-2-11 Has anyone experienced anything similar? Thank you!
Filter data from one RDD based on data from another RDD
Hi, I have two RDD's with csv data as below : RDD-1 101970_5854301840,fbcf5485-e696-4100-9468-a17ec7c5bb43,19229261643 101970_5854301839,fbaf5485-e696-4100-9468-a17ec7c5bb39,9229261645 101970_5854301839,fbbf5485-e696-4100-9468-a17ec7c5bb39,9229261647 101970_17038953,546853f9-cf07-4700-b202-00f21e7c56d8,791191603 101970_5854301840,fbcf5485-e696-4100-9468-a17ec7c5bb42,19229261643 101970_5851048323,218f5485-e58c-4200-a473-348ddb858578,290542385 101970_5854301839,fbcf5485-e696-4100-9468-a17ec7c5bb41,922926164 RDD-2 101970_17038953,546853f9-cf07-4700-b202-00f21e7c56d9,7911160 101970_5851048323,218f5485-e58c-4200-a473-348ddb858578,2954238 101970_5854301839,fbaf5485-e696-4100-9468-a17ec7c5bb39,9226164 101970_5854301839,fbbf5485-e696-4100-9468-a17ec7c5bb39,92292164 101970_5854301839,fbcf5485-e696-4100-9468-a17ec7c5bb41,9226164 101970_5854301838,fbcf5485-e696-4100-9468-a17ec7c5bb40,929164 101970_5854301838,fbcf5485-e696-4100-9468-a17ec7c5bb39,26164 I need to filter RDD-2 to include only those records where the first column value in RDD-2 matches any of the first column values in RDD-1 Currently , I am broadcasting the first column values from RDD-1 as a list and then filtering RDD-2 based on that list. val rdd1broadcast = sc.broadcast(rdd1.map { uu = uu.split(,)(0) }.collect().toSet) val rdd2filtered = rdd2.filter{ h = rdd1broadcast.value.contains(h.split(,)(0)) } This will result in data with first column 101970_5854301838 (last two records) to be filtered out from RDD-2. Is this is the best way to accomplish this ? I am worried that for large data volume , the broadcast step may become an issue. Appreciate any other suggestion. --- Thanks Himanish
Incorrect number of records after left outer join (I think)
Consider the following left outer join potentialDailyModificationsRDD = reducedDailyPairRDD.leftOuterJoin(baselinePairRDD).partitionBy(new HashPartitioner(1024)).persist(StorageLevel.MEMORY_AND_DISK_SER()); Below are the record counts for the RDDs involved Number of records for reducedDailyPairRDD: 2565206 Number of records for baselinePairRDD: 56102812 Number of records for potentialDailyModificationsRDD: 2570115 Below are the partitioners for the RDDs involved. Partitioner for reducedDailyPairRDD: Some(org.apache.spark.HashPartitioner@400) Partitioner for baselinePairRDD: Some(org.apache.spark.HashPartitioner@400) Partitioner for potentialDailyModificationsRDD: Some(org.apache.spark.HashPartitioner@400) I realize in the above statement that the .partitionBy is probably not needed as the underlying RDDs used in the left outer join are already hash partitioned. My question is how the resulting RDD (potentialDailyModificationsRDD) can end up with more records than reducedDailyPairRDD. I would think the number of records in potentialDailyModificationsRDD should be 2565206 instead of 2570115. Am I missing something or is this possibly a bug? I'm using Apache Spark 1.2 on a stand-alone cluster on ec2. To get the counts for the records, I'm using the .count() for the RDD. Thanks. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming and message ordering
Kafka ordering is guaranteed on a per-partition basis. The high-level consumer api as used by the spark kafka streams prior to 1.3 will consume from multiple kafka partitions, thus not giving any ordering guarantees. The experimental direct stream in 1.3 uses the simple consumer api, and there is a 1:1 correspondence between spark partitions and kafka partitions. So you will get deterministic ordering, but only on a per-partition basis. On Thu, Feb 19, 2015 at 11:31 PM, Neelesh neele...@gmail.com wrote: I had a chance to talk to TD today at the Strata+Hadoop Conf in San Jose. We talked a bit about this after his presentation about this - the short answer is spark streaming does not guarantee any sort of ordering (within batches, across batches). One would have to use updateStateByKey to collect the events and sort them based on some attribute of the event. But TD said message ordering is a frequently asked feature recently and is getting on his radar. I went through the source code and there does not seem to be any architectural/design limitation to support this. (JobScheduler, JobGenerator are a good starting point to see how stuff works under the hood). Overriding DStream#compute and using streaminglistener looks like a simple way of ensuring ordered execution of batches within a stream. But this would be a partial solution, since ordering within a batch needs some more work that I don't understand fully yet. Side note : My custom receiver polls the metricsservlet once in a while to decide whether jobs are getting done fast enough and throttle/relax pushing data in to receivers based on the numbers provided by metricsservlet. I had to do this because out-of-the-box rate limiting right now is static and cannot adapt to the state of the cluster thnx -neelesh On Wed, Feb 18, 2015 at 4:13 PM, jay vyas jayunit100.apa...@gmail.com wrote: This is a *fantastic* question. The idea of how we identify individual things in multiple DStreams is worth looking at. The reason being, that you can then fine tune your streaming job, based on the RDD identifiers (i.e. are the timestamps from the producer correlating closely to the order in which RDD elements are being produced) ? If *NO* then you need to (1) dial up throughput on producer sources or else (2) increase cluster size so that spark is capable of evenly handling load. You cant decide to do (1) or (2) unless you can track when the streaming elements are being converted to RDDs by spark itself. On Wed, Feb 18, 2015 at 6:54 PM, Neelesh neele...@gmail.com wrote: There does not seem to be a definitive answer on this. Every time I google for message ordering,the only relevant thing that comes up is this - http://samza.apache.org/learn/documentation/0.8/comparisons/spark-streaming.html . With a kafka receiver that pulls data from a single kafka partition of a kafka topic, are individual messages in the microbatch in same the order as kafka partition? Are successive microbatches originating from a kafka partition executed in order? Thanks! -- jay vyas
Re: Re: Spark streaming doesn't print output when working with standalone master
Thanks Akhil, you are right. I checked and find that I have only 1 core allocated to the program I am running on a visual machine,and only allocate one processor to it(1 core per processor), so even if I have specified --total-executor-cores 3 in the submit script, the application will still only be allocated one processor. This leads to me another question: Although I have only one core, If I have specified the master and executor as --master local[3] --executor-memory 512M --total-executor-cores 3. Since I have only one core, why does this work? bit1...@163.com From: Akhil Das Date: 2015-02-20 15:13 To: bit1...@163.com CC: user Subject: Re: Spark streaming doesn't print output when working with standalone master While running the program go to your clusters webUI (that runs on 8080, prolly at hadoop.master:8080) and see how many cores are allocated to the program, it should be = 2 for the stream to get processed. Thanks Best Regards On Fri, Feb 20, 2015 at 9:29 AM, bit1...@163.com bit1...@163.com wrote: Hi, I am trying the spark streaming log analysis reference application provided by Databricks at https://github.com/databricks/reference-apps/tree/master/logs_analyzer When I deploy the code to the standalone cluster, there is no output at will with the following shell script.Which means, the windowDStream has 0 RDDs ./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 3 --class spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming LogApp.jar But, when I change --master to be --master local[3], the program starts to work fine. Can anyone have some advice? Thanks! ./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master local[3] --executor-memory 512M --total-executor-cores 3 --class spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming LogApp.jar object LogAnalyzerStreaming { val WINDOW_LENGTH = new Duration(12 * 1000) val SLIDE_INTERVAL = new Duration(6 * 1000) def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(Log Analyzer Streaming in Scala) val sc = new SparkContext(sparkConf) val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL) val logLinesDStream = streamingContext.socketTextStream(localhost, ) val accessLogsDStream = logLinesDStream.map(ApacheAccessLog.parseLogLine).cache() val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL) windowDStream.foreachRDD(accessLogs = { if (accessLogs.count() == 0) { println(No access com.databricks.app.logs received in this time interval) } else { // Calculate statistics based on the content size. val contentSizes = accessLogs.map(log = log.contentSize).cache() println(Content Size Avg: %s, Min: %s, Max: %s.format( contentSizes.reduce(_ + _) / contentSizes.count, contentSizes.min, contentSizes.max )) streamingContext.start() streamingContext.awaitTermination() } } 邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。 共有 1 个附件 image.png(13K) 极速下载 在线预览
Re: How to pass parameters to a spark-jobserver Scala class?
Hi Sasi, I am not sure about Vaadin, but by simple googling you can find many article on how to pass json parameters in http. http://stackoverflow.com/questions/21404252/post-request-send-json-data-java-httpurlconnection You can also try Finagle which is fully fault tolerant framework by Twitter. Regards, Vasu C -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-pass-parameters-to-a-spark-jobserver-Scala-class-tp21671p21727.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: using a database connection pool to write data into an RDBMS from a Spark application
Hi Kelvin, Yes. I am creating an uber jar with the Postgres driver included, but nevertheless tried both –jars and –driver-classpath flags. It didn’t help. Interestingly, I can’t use BoneCP even in the driver program when I run my application with spark-submit. I am getting the same exception when the application initializes BoneCP before creating SparkContext. It looks like Spark is loading a different version of the Postgres JDBC driver than the one that I am linking. Mohammed From: Kelvin Chu [mailto:2dot7kel...@gmail.com] Sent: Thursday, February 19, 2015 7:56 PM To: Mohammed Guller Cc: user@spark.apache.org Subject: Re: using a database connection pool to write data into an RDBMS from a Spark application Hi Mohammed, Did you use --jars to specify your jdbc driver when you submitted your job? Take a look of this link: http://spark.apache.org/docs/1.2.0/submitting-applications.html Hope this help! Kelvin On Thu, Feb 19, 2015 at 7:24 PM, Mohammed Guller moham...@glassbeam.commailto:moham...@glassbeam.com wrote: Hi – I am trying to use BoneCP (a database connection pooling library) to write data from my Spark application to an RDBMS. The database inserts are inside a foreachPartition code block. I am getting this exception when the code tries to insert data using BoneCP: java.sql.SQLException: No suitable driver found for jdbc:postgresql://hostname:5432/dbname I tried explicitly loading the Postgres driver on the worker nodes by adding the following line inside the foreachPartition code block: Class.forName(org.postgresql.Driver) It didn’t help. Has anybody able to get a database connection pool library to work with Spark? If you got it working, can you please share the steps? Thanks, Mohammed
Re: Failure on a Pipe operation
It appears that the file paths are different when running spark in local and cluster mode. When running spark without --master the paths to the pipe command are relative to the local machine. When running spark with --master the paths to the pipe command are ./ This is what finally worked. I still don't understand why. It's not documented and a cursory glance of the source code didn't help. sc.addFile(./ProgramSIM); ... JavaRDDString output = data.pipe(SparkFiles.get(./ProgramSIM)); On Thu, Feb 19, 2015 at 5:41 PM, Imran Rashid iras...@cloudera.com wrote: The error msg is telling you the exact problem, it can't find ProgramSIM, the thing you are trying to run Lost task 3520.3 in stage 0.0 (TID 11, compute3.research.dev): java.io.IOException: Cannot run program ProgramSIM: error=2, No s\ uch file or directory On Thu, Feb 19, 2015 at 5:52 PM, athing goingon athinggoin...@gmail.com wrote: Hi, I'm trying to figure out why the following job is failing on a pipe http://pastebin.com/raw.php?i=U5E8YiNN With this exception: http://pastebin.com/raw.php?i=07NTGyPP Any help is welcome. Thank you.
Re: using a database connection pool to write data into an RDBMS from a Spark application
Hi Mohammed, Did you use --jars to specify your jdbc driver when you submitted your job? Take a look of this link: http://spark.apache.org/docs/1.2.0/submitting-applications.html Hope this help! Kelvin On Thu, Feb 19, 2015 at 7:24 PM, Mohammed Guller moham...@glassbeam.com wrote: Hi – I am trying to use BoneCP (a database connection pooling library) to write data from my Spark application to an RDBMS. The database inserts are inside a foreachPartition code block. I am getting this exception when the code tries to insert data using BoneCP: java.sql.SQLException: No suitable driver found for jdbc:postgresql://hostname:5432/dbname I tried explicitly loading the Postgres driver on the worker nodes by adding the following line inside the foreachPartition code block: Class.forName(org.postgresql.Driver) It didn’t help. Has anybody able to get a database connection pool library to work with Spark? If you got it working, can you please share the steps? Thanks, Mohammed
Spark streaming doesn't print output when working with standalone master
Hi, I am trying the spark streaming log analysis reference application provided by Databricks at https://github.com/databricks/reference-apps/tree/master/logs_analyzer When I deploy the code to the standalone cluster, there is no output at will with the following shell script.Which means, the windowDStream has 0 RDDs ./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 3 --class spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming LogApp.jar But, when I change --master to be --master local[3], the program starts to work fine. Can anyone have some advice? Thanks! ./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master local[3] --executor-memory 512M --total-executor-cores 3 --class spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming LogApp.jar object LogAnalyzerStreaming { val WINDOW_LENGTH = new Duration(12 * 1000) val SLIDE_INTERVAL = new Duration(6 * 1000) def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(Log Analyzer Streaming in Scala) val sc = new SparkContext(sparkConf) val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL) val logLinesDStream = streamingContext.socketTextStream(localhost, ) val accessLogsDStream = logLinesDStream.map(ApacheAccessLog.parseLogLine).cache() val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL) windowDStream.foreachRDD(accessLogs = { if (accessLogs.count() == 0) { println(No access com.databricks.app.logs received in this time interval) } else { // Calculate statistics based on the content size. val contentSizes = accessLogs.map(log = log.contentSize).cache() println(Content Size Avg: %s, Min: %s, Max: %s.format( contentSizes.reduce(_ + _) / contentSizes.count, contentSizes.min, contentSizes.max )) streamingContext.start() streamingContext.awaitTermination() } }
Re: Tableau beta connector
Thanks Todd. great stuff :) Regards, Ashu From: Todd Nist tsind...@gmail.com Sent: Thursday, February 19, 2015 7:46 PM To: Ashutosh Trivedi (MT2013030) Cc: user@spark.apache.org Subject: Re: Tableau beta connector I am able to connect by doing the following using the Tableau Initial SQL and a custom query: 1. First ingest csv file or json and save out to file system: import org.apache.spark.sql.SQLContext import com.databricks.spark.csv._ val sqlContext = new SQLContext(sc) val demo = sqlContext.csvFile(/user/data/csv/demo.csv) demo.toJSON.saveAsTextFile(/user/data/json/test) 2. Start $SPARK_HOME/sbin/start-thirftserver: ./sbin/start-thriftserver.sh --master spark://radtech.io:7077http://radtech.io:7077 --total-executor-cores 2 --driver-class-path --hiveconf hive.server2.thrift.port=10001 --hiveconf hive.server2.thrift.bind.host radtech.iohttp://radtech.io 3. Start tableau session. Create a connection to thrift server via SparkSQL (Beta) connector. 4. In Tableau add the following to the Initial SQL create temporary table test using org.apache.spark.sql.json options (path '/user/data/json/test/*'); cache table test; * Refresh connection. Then select New Custom SQL and issue something like: select * from test; You will see your table appear. HTH. -Todd On Thu, Feb 19, 2015 at 5:41 AM, ashu ashutosh.triv...@iiitb.orgmailto:ashutosh.triv...@iiitb.org wrote: Hi, I would like you to read my stack overflow answer to this question. If you need more clarification feel free to drop a msg. http://stackoverflow.com/questions/28403664/connect-to-existing-hive-in-intellij-using-sbt-as-build Regards, Ashutosh From: ganterm [via Apache Spark User List] ml-node+[hidden email]http:///user/SendEmail.jtp?type=nodenode=21719i=0 Sent: Thursday, February 19, 2015 12:49 AM To: Ashutosh Trivedi (MT2013030) Subject: Re: Tableau beta connector Ashutosh, Were you able to figure this out? I am having the exact some question. I think the answer is to use Spark SQL to create/load a table in Hive (e.g. execute the HiveQL CREATE TABLE statement) but I am not sure. Hoping for something more simple than that. Anybody? Thanks! If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Tableau-beta-connector-tp21512p21709.html To unsubscribe from Tableau beta connector, click here. NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml View this message in context: Re: Tableau beta connectorhttp://apache-spark-user-list.1001560.n3.nabble.com/Tableau-beta-connector-tp21512p21719.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Caching RDD
Hi, I have HDFS file of size 598MB. I create RDD over this file and cache it in RAM in a 7 node cluster with 2G RAM each. I find that each partition gets replicated thrice or even 4 times in the cluster even without me specifying in code. Total partitions are 5 for the RDD created but cached partitions are 17. I want to know if Spark replicates RDD automatically whenever there is scope (availability of extra resources)? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Caching-RDD-tp21728.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark Performance on Yarn
I'm a bit new to Spark, but had a question on performance. I suspect a lot of my issue is due to tuning and parameters. I have a Hive external table on this data and to run queries against it runs in minutes The Job: + 40gb of avro events on HDFS (100 million+ avro events) + Read in the files from HDFS and dedupe events by key (mapToPair then a reduceByKey) + RDD returned and persisted (disk and memory) + Then passed to a job that take the RDD and mapToPair of new object data and then reduceByKey and foreachpartion do work The issue: When I run this on my environment on Yarn this takes 20+ hours. Running on yarn we see the first stage runs to do build the RDD deduped, but then when the next stage starts, things fail and data is lost. This results in stage 0 starting over and over and just dragging it out. Errors I see in the driver logs: ERROR cluster.YarnClientClusterScheduler: Lost executor 1 on X: remote Akka client disassociated 15/02/20 00:27:36 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.1 (TID 1335,): FetchFailed(BlockManagerId(3, i, 33958), shuffleId=1, mapId=162, reduceId=0, message= org.apache.spark.shuffle.FetchFailedException: Failed to connect toX/X:33958 Also we see this, but I'm suspecting this is because the previous stage fails and the next one starts: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1 Cluster: 5 machines, each 2 core , 8gb machines Spark-submit command: spark-submit --class com.myco.SparkJob \ --master yarn \ /tmp/sparkjob.jar \ Any thoughts or where to look or how to start approaching this problem or more data points to present. Thanks.. Code for the job: JavaRDDAnalyticsEvent events = ((JavaRDDAvroKeylt;AnalyticsEvent) context.newAPIHadoopRDD( context.hadoopConfiguration(), AvroKeyInputFormat.class, AvroKey.class, NullWritable.class ).keys()) .map(event - AnalyticsEvent.newBuilder(event.datum()).build()) .filter(key - { return Optional.ofNullable(key.getStepEventKey()).isPresent(); }) .mapToPair(event - new Tuple2AnalyticsEvent, Integer(event, 1)) .reduceByKey((analyticsEvent1, analyticsEvent2) - analyticsEvent1) .map(tuple - tuple._1()); events.persist(StorageLevel.MEMORY_AND_DISK_2()); events.mapToPair(event - { return new Tuple2T, RunningAggregates( keySelector.select(event), new RunningAggregates( Optional.ofNullable(event.getVisitors()).orElse(0L), Optional.ofNullable(event.getImpressions()).orElse(0L), Optional.ofNullable(event.getAmount()).orElse(0.0D), Optional.ofNullable(event.getAmountSumOfSquares()).orElse(0.0D))); }) .reduceByKey((left, right) - { return left.add(right); }) .foreachpartition(dostuff) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming and message ordering
I had a chance to talk to TD today at the Strata+Hadoop Conf in San Jose. We talked a bit about this after his presentation about this - the short answer is spark streaming does not guarantee any sort of ordering (within batches, across batches). One would have to use updateStateByKey to collect the events and sort them based on some attribute of the event. But TD said message ordering is a frequently asked feature recently and is getting on his radar. I went through the source code and there does not seem to be any architectural/design limitation to support this. (JobScheduler, JobGenerator are a good starting point to see how stuff works under the hood). Overriding DStream#compute and using streaminglistener looks like a simple way of ensuring ordered execution of batches within a stream. But this would be a partial solution, since ordering within a batch needs some more work that I don't understand fully yet. Side note : My custom receiver polls the metricsservlet once in a while to decide whether jobs are getting done fast enough and throttle/relax pushing data in to receivers based on the numbers provided by metricsservlet. I had to do this because out-of-the-box rate limiting right now is static and cannot adapt to the state of the cluster thnx -neelesh On Wed, Feb 18, 2015 at 4:13 PM, jay vyas jayunit100.apa...@gmail.com wrote: This is a *fantastic* question. The idea of how we identify individual things in multiple DStreams is worth looking at. The reason being, that you can then fine tune your streaming job, based on the RDD identifiers (i.e. are the timestamps from the producer correlating closely to the order in which RDD elements are being produced) ? If *NO* then you need to (1) dial up throughput on producer sources or else (2) increase cluster size so that spark is capable of evenly handling load. You cant decide to do (1) or (2) unless you can track when the streaming elements are being converted to RDDs by spark itself. On Wed, Feb 18, 2015 at 6:54 PM, Neelesh neele...@gmail.com wrote: There does not seem to be a definitive answer on this. Every time I google for message ordering,the only relevant thing that comes up is this - http://samza.apache.org/learn/documentation/0.8/comparisons/spark-streaming.html . With a kafka receiver that pulls data from a single kafka partition of a kafka topic, are individual messages in the microbatch in same the order as kafka partition? Are successive microbatches originating from a kafka partition executed in order? Thanks! -- jay vyas
Re: Learning GraphX Questions
Hi, Vertices are simply hash-partitioned by spark.HashPartitioner, so you easily calculate partition ids by yourself. Also, you can type the lines to check ids; import org.apache.spark.graphx._ graph.vertices.mapPartitionsWithIndex { (pid, iter) = val vids = Array.newBuilder[VertexId] for (d - iter) vids += d._1 Iterator((pid, vids.result)) } .map(d = sPID:${d._1} IDs:${d._2.toSeq.toString}) .collect .foreach(println) On Thu, Feb 19, 2015 at 12:31 AM, Matthew Bucci mrbucci...@gmail.com wrote: Thanks for all the responses so far! I have started to understand the system more, but I just had another question while I was going along. Is there a way to check the individual partitions of an RDD? For example, if I had a graph with vertices a,b,c,d and it was split into 2 partitions could I check which vertices belonged in partition 1 and parition 2? Thank You, Matthew Bucci On Fri, Feb 13, 2015 at 10:58 PM, Ankur Dave ankurd...@gmail.com wrote: At 2015-02-13 12:19:46 -0800, Matthew Bucci mrbucci...@gmail.com wrote: 1) How do you actually run programs in GraphX? At the moment I've been doing everything live through the shell, but I'd obviously like to be able to work on it by writing and running scripts. You can create your own projects that build against Spark and GraphX through a Maven dependency [1], then run those applications using the bin/spark-submit script included with Spark [2]. These guides assume you already know how to do this using your preferred build tool (SBT or Maven). In short, here's how to do it with SBT: 1. Install SBT locally (`brew install sbt` on OS X). 2. Inside your project directory, create a build.sbt file listing Spark and GraphX as a dependency, as in [3]. 3. Run `sbt package` in a shell. 4. Pass the JAR in your_project_dir/target/scala-2.10/ to bin/spark-submit. [1] http://spark.apache.org/docs/latest/programming-guide.html#linking-with-spark [2] http://spark.apache.org/docs/latest/submitting-applications.html [3] https://gist.github.com/ankurdave/1fb7234d8affb3a2e4f4 2) Is there a way to check the status of the partitions of a graph? For example, I want to determine for starters if the number of partitions requested are always made, like if I ask for 8 partitions but only have 4 cores what happens? You can look at `graph.vertices` and `graph.edges`, which are both RDDs, so you can do for example: graph.vertices.partitions 3) Would I be able to partition by vertex instead of edges, even if I had to write it myself? I know partitioning by edges is favored in a majority of the cases, but for the sake of research I'd like to be able to do both. If you pass PartitionStrategy.EdgePartition1D, this will partition edges by their source vertices, so all edges with the same source will be co-partitioned, and the communication pattern will be similar to vertex-partitioned (edge-cut) systems like Giraph. 4) Is there a better way to time processes outside of using built-in unix timing through the logs or something? I think the options are Unix timing, log file timestamp parsing, looking at the web UI, or writing timing code within your program (System.currentTimeMillis and System.nanoTime). Ankur -- --- Takeshi Yamamuro
Re: Spark Streaming and message ordering
Even with the new direct streams in 1.3, isn't it the case that the job *scheduling* follows the partition order, rather than job *execution*? Or is it the case that the stream listens to job completion event (using a streamlistener) before scheduling the next batch? To compare with storm from a message ordering point of view, unless a tuple is fully processed by the DAG (as defined by spout+bolts), the next tuple does not enter the DAG. On Thu, Feb 19, 2015 at 9:47 PM, Cody Koeninger c...@koeninger.org wrote: Kafka ordering is guaranteed on a per-partition basis. The high-level consumer api as used by the spark kafka streams prior to 1.3 will consume from multiple kafka partitions, thus not giving any ordering guarantees. The experimental direct stream in 1.3 uses the simple consumer api, and there is a 1:1 correspondence between spark partitions and kafka partitions. So you will get deterministic ordering, but only on a per-partition basis. On Thu, Feb 19, 2015 at 11:31 PM, Neelesh neele...@gmail.com wrote: I had a chance to talk to TD today at the Strata+Hadoop Conf in San Jose. We talked a bit about this after his presentation about this - the short answer is spark streaming does not guarantee any sort of ordering (within batches, across batches). One would have to use updateStateByKey to collect the events and sort them based on some attribute of the event. But TD said message ordering is a frequently asked feature recently and is getting on his radar. I went through the source code and there does not seem to be any architectural/design limitation to support this. (JobScheduler, JobGenerator are a good starting point to see how stuff works under the hood). Overriding DStream#compute and using streaminglistener looks like a simple way of ensuring ordered execution of batches within a stream. But this would be a partial solution, since ordering within a batch needs some more work that I don't understand fully yet. Side note : My custom receiver polls the metricsservlet once in a while to decide whether jobs are getting done fast enough and throttle/relax pushing data in to receivers based on the numbers provided by metricsservlet. I had to do this because out-of-the-box rate limiting right now is static and cannot adapt to the state of the cluster thnx -neelesh On Wed, Feb 18, 2015 at 4:13 PM, jay vyas jayunit100.apa...@gmail.com wrote: This is a *fantastic* question. The idea of how we identify individual things in multiple DStreams is worth looking at. The reason being, that you can then fine tune your streaming job, based on the RDD identifiers (i.e. are the timestamps from the producer correlating closely to the order in which RDD elements are being produced) ? If *NO* then you need to (1) dial up throughput on producer sources or else (2) increase cluster size so that spark is capable of evenly handling load. You cant decide to do (1) or (2) unless you can track when the streaming elements are being converted to RDDs by spark itself. On Wed, Feb 18, 2015 at 6:54 PM, Neelesh neele...@gmail.com wrote: There does not seem to be a definitive answer on this. Every time I google for message ordering,the only relevant thing that comes up is this - http://samza.apache.org/learn/documentation/0.8/comparisons/spark-streaming.html . With a kafka receiver that pulls data from a single kafka partition of a kafka topic, are individual messages in the microbatch in same the order as kafka partition? Are successive microbatches originating from a kafka partition executed in order? Thanks! -- jay vyas
Re: Spark 1.2.1: ClassNotFoundException when running hello world example in scala 2.11
Can you downgrade your scala dependency to 2.10 and give it a try? Thanks Best Regards On Fri, Feb 20, 2015 at 12:40 AM, Luis Solano l...@pixable.com wrote: I'm having an issue with spark 1.2.1 and scala 2.11. I detailed the symptoms in this stackoverflow question. http://stackoverflow.com/questions/28612837/spark-classnotfoundexception-when-running-hello-world-example-in-scala-2-11 Has anyone experienced anything similar? Thank you!
Re: How to diagnose could not compute split errors and failed jobs?
Not quiet sure, but this can be the case. One of your executor is stuck on GC pause while the other one asks for the data from it and hence the request timesout ending in that exception. You can try increasing the akk framesize and ack wait timeout as follows: .set(spark.core.connection.ack.wait.timeout,600) .set(spark.akka.frameSize,50) Thanks Best Regards On Fri, Feb 20, 2015 at 6:21 AM, Tim Smith secs...@gmail.com wrote: My streaming app runs fine for a few hours and then starts spewing Could not compute split, block input-xx-xxx not found errors. After this, jobs start to fail and batches start to pile up. My question isn't so much about why this error but rather, how do I trace what leads to this error? I am using disk+memory for storage so shouldn't be a case of data loss resulting from memory overrun. 15/02/18 22:04:49 ERROR JobScheduler: Error running job streaming job 142429705 ms.28 org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 247644.0 failed 64 times, most recent failure: Lost task 3.63 in stage 247644.0 (TID 3705290, node-dn1-16-test.abcdefg.com): java.lang.Exception: Could not compute split, block input-28-1424297042500 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Thanks, Tim
Re: Spark streaming doesn't print output when working with standalone master
While running the program go to your clusters webUI (that runs on 8080, prolly at hadoop.master:8080) and see how many cores are allocated to the program, it should be = 2 for the stream to get processed. [image: Inline image 1] Thanks Best Regards On Fri, Feb 20, 2015 at 9:29 AM, bit1...@163.com bit1...@163.com wrote: Hi, I am trying the spark streaming log analysis reference application provided by Databricks at https://github.com/databricks/reference-apps/tree/master/logs_analyzer When I deploy the code to the standalone cluster, there is no output at will with the following shell script.Which means, the windowDStream has 0 RDDs ./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 3 --class spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming LogApp.jar But, when I change --master to be --master local[3], the program starts to work fine. Can anyone have some advice? Thanks! ./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master local[3] --executor-memory 512M --total-executor-cores 3 --class spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming LogApp.jar object LogAnalyzerStreaming { val WINDOW_LENGTH = new Duration(12 * 1000) val SLIDE_INTERVAL = new Duration(6 * 1000) def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(Log Analyzer Streaming in Scala) val sc = new SparkContext(sparkConf) val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL) val logLinesDStream = streamingContext.socketTextStream(localhost, ) val accessLogsDStream = logLinesDStream.map(ApacheAccessLog.parseLogLine).cache() val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL) windowDStream.foreachRDD(accessLogs = { if (accessLogs.count() == 0) { println(No access com.databricks.app.logs received in this time interval) } else { // Calculate statistics based on the content size. val contentSizes = accessLogs.map(log = log.contentSize).cache() println(Content Size Avg: %s, Min: %s, Max: %s.format( contentSizes.reduce(_ + _) / contentSizes.count, contentSizes.min, contentSizes.max )) streamingContext.start() streamingContext.awaitTermination() } } --
Re: percentil UDAF in spark 1.2.0
Already fixed: https://github.com/apache/spark/pull/2802 On Thu, Feb 19, 2015 at 3:17 PM, Mohnish Kodnani mohnish.kodn...@gmail.com wrote: Hi, I am trying to use percentile and getting the following error. I am using spark 1.2.0. Does UDAF percentile exist in that code line and do i have to do something to get this to work. java.util.NoSuchElementException: key not found: percentile at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:53) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$2.applyOrElse(Analyzer.scala:220) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$2.applyOrElse(Analyzer.scala:218) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) Thanks mohnish
Re: percentil UDAF in spark 1.2.0
The percentile UDAF came in across a couple of PRs. Commit f33d55046427b8594fd19bda5fd2214eeeab1a95 reflects the most recent work, I believe. It will be part of the 1.3.0 release very soon: http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-Release-Apache-Spark-1-3-0-RC1-tt10658.html On Thu, Feb 19, 2015 at 3:39 PM, Mohnish Kodnani mohnish.kodn...@gmail.com wrote: Isnt that PR about being able to pass in an array to percentile function. If I understand this error correctly, its not able to find the function percentile itself. Also, if I am incorrect and that PR fixes it, is it available in a release ? On Thu, Feb 19, 2015 at 3:27 PM, Mark Hamstra m...@clearstorydata.com wrote: Already fixed: https://github.com/apache/spark/pull/2802 On Thu, Feb 19, 2015 at 3:17 PM, Mohnish Kodnani mohnish.kodn...@gmail.com wrote: Hi, I am trying to use percentile and getting the following error. I am using spark 1.2.0. Does UDAF percentile exist in that code line and do i have to do something to get this to work. java.util.NoSuchElementException: key not found: percentile at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:53) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$2.applyOrElse(Analyzer.scala:220) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$2.applyOrElse(Analyzer.scala:218) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) Thanks mohnish
stack map functions in a loop (pyspark)
Hi all, I think I have run into an issue on the lazy evaluation of variables in pyspark, I have to following functions = [func1, func2, func3] for counter in range(len(functions)): data = data.map(lambda value: [functions[counter](value)]) it looks like that the counter is evaluated when the RDD is computed, so it fills in all the three mappers with the last value of it. Is there any way to get it forced to be evaluated at the time? (I am aware that I could run persist it after each step, which sounds a bit of a waste) thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/stack-map-functions-in-a-loop-pyspark-tp21722.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD Partition number
What file system are you using ? If you use hdfs, the documentation you cited is pretty clear on how partitions are determined. bq. file X replicated on 4 machines I don't think replication factor plays a role w.r.t. partitions. On Thu, Feb 19, 2015 at 8:05 AM, Alessandro Lulli lu...@di.unipi.it wrote: Hi All, Could you please help me understanding how Spark defines the number of partitions of the RDDs if not specified? I found the following in the documentation for file loaded from HDFS: *The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks* What is the rule for file loaded from the file systems? For instance, i have a file X replicated on 4 machines. If i load the file X in a RDD how many partitions are defined and why? Thanks for your help on this Alessandro
Implicit ALS with multiple features
Hello, I would like to use the spark MLlib recommendation filtering library. My goal will be to predict what a user would like to buy based on what he bought before. I read on the spark documentation that Spark supports implicit feedback. However there is not example for this application. Would implicit feedback works on my business case and how? Can ALS accept multiple parameters. Currently I have : (userId,productId,nbPurchased) I would like to another parameter: (userId,productId,nbPurchased,frenquency) Is it possible with ALS? Thank you for your reply -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Implicit-ALS-with-multiple-features-tp21723.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Filtering keys after map+combine
I'm not sure what your use case is, but perhaps you could use mapPartitions to reduce across the individual partitions and apply your filtering. Then you can finish with a reduceByKey. On Thu, Feb 19, 2015 at 9:21 AM, Debasish Das debasish.da...@gmail.com wrote: Hi, Before I send out the keys for network shuffle, in reduceByKey after map + combine are done, I would like to filter the keys based on some threshold... Is there a way to get the key, value after map+combine stages so that I can run a filter on the keys ? Thanks. Deb -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 54 W 40th St, New York, NY 10018 E: daniel.siegm...@velos.io W: www.velos.io
Re: Spark job fails on cluster but works fine on a single machine
On Feb 19, 2015, at 7:29 PM, Pavel Velikhov pavel.velik...@icloud.com wrote: I have a simple Spark job that goes out to Cassandra, runs a pipe and stores results: val sc = new SparkContext(conf) val rdd = sc.cassandraTable(“keyspace, “table) .map(r = r.getInt(“column) + \t + write(get_lemmas(r.getString(tags .pipe(python3 /tmp/scripts_and_models/scripts/run.py) .map(r = convertStr(r) ) .coalesce(1,true) .saveAsTextFile(/tmp/pavel/CassandraPipeTest.txt) //.saveToCassandra(“keyspace, “table, SomeColumns(“id”,data”)) When run on a single machine, everything is fine if I save to an hdfs file or save to Cassandra. When run in cluster neither works: - When saving to file, I get an exception: User class threw exception: Output directory hdfs://hadoop01:54310/tmp/pavel/CassandraPipeTest.txt hdfs://hadoop01:54310/tmp/pavel/CassandraPipeTest.txt already exists - When saving to Cassandra, only 4 rows are updated with empty data (I test on a 4-machine Spark cluster) Any hints on how to debug this and where the problem could be? - I delete the hdfs file before running - Would really like the output to hdfs to work, so I can debug - Then it would be nice to save to Cassandra - Btw I *don’t* want to use .coalesce(1,true) in the future as well.
Spark job fails on cluster but works fine on a single machine
I have a simple Spark job that goes out to Cassandra, runs a pipe and stores results: val sc = new SparkContext(conf) val rdd = sc.cassandraTable(“keyspace, “table) .map(r = r.getInt(“column) + \t + write(get_lemmas(r.getString(tags .pipe(python3 /tmp/scripts_and_models/scripts/run.py) .map(r = convertStr(r) ) .coalesce(1,true) .saveAsTextFile(/tmp/pavel/CassandraPipeTest.txt) //.saveToCassandra(“keyspace, “table, SomeColumns(“id”,data”)) When run on a single machine, everything is fine if I save to an hdfs file or save to Cassandra. When run in cluster neither works: - When saving to file, I get an exception: User class threw exception: Output directory hdfs://hadoop01:54310/tmp/pavel/CassandraPipeTest.txt already exists - When saving to Cassandra, only 4 rows are updated with empty data (I test on a 4-machine Spark cluster) Any hints on how to debug this and where the problem could be? - I delete the hdfs file before running - Would really like the output to hdfs to work, so I can debug - Then it would be nice to save to Cassandra
Re: Unzipping large files and 2GB partition size.
Thanks for your detailed reply Imran. I'm writing this in Clojure (using Flambo which uses the Java API) but I don't think that's relevant. So here's the pseudocode (sorry I've not written Scala for a long time): val rawData = sc.hadoopFile(/dir/to/gzfiles) // NB multiple files. val parsedFiles = rawData.map(parseFunction) // can return nil on failure val filtered = parsedFiles.filter(notNil) val partitioned = filtered.repartition(100) // guessed number val persisted = partitioned.persist(StorageLevels.DISK_ONLY) val resultA = stuffA(persisted) val resultB = stuffB(persisted) val resultC = stuffC(persisted) So, I think I'm already doing what you suggested. I would have assumed that partition size would be («size of expanded file» / «number of partitions»). In this case, 100 (which I picked out of the air). I wonder whether the «size of expanded file» is actually the size of all concatenated input files (probably about 800 GB)? In that case should I multiply it by the number of files? Or perhaps I'm barking up completely the wrong tree. Joe On 19 February 2015 at 14:44, Imran Rashid iras...@cloudera.com wrote: Hi Joe, The issue is not that you have input partitions that are bigger than 2GB -- its just that they are getting cached. You can see in the stack trace, the problem is when you try to read data out of the DiskStore: org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) Also, just because you see this: 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing tasks from Stage 1 (MappedRDD[17] at mapToPair at NativeMethodAccessorImpl.java:-2) it doesn't *necessarily* mean that this is coming from your map. It can be pretty confusing how your operations on RDDs get turned into stages, it could be a lot more than just your map. and actually, it might not even be your map at all -- some of the other operations you invoke call map underneath the covers. So its hard to say what is going on here w/ out seeing more code. Anyway, maybe you've already considered all this (you did mention the lazy execution of the DAG), but I wanted to make sure. it might help to use rdd.setName() and also to look at rdd.toDebugString. As far as what you can do about this -- it could be as simple as moving your rdd.persist() to after you have compressed and repartitioned your data. eg., I'm blindly guessing you have something like this: val rawData = sc.hadoopFile(...) rawData.persist(DISK) rawData.count() val compressedData = rawData.map{...} val repartitionedData = compressedData.repartition(N) ... change it to something like: val rawData = sc.hadoopFile(...) val compressedData = rawData.map{...} val repartitionedData = compressedData.repartition(N) repartitionedData.persist(DISK) repartitionedData.count() ... The point is, you avoid caching any data until you have ensured that the partitions are small. You might have big partitions before that in rawData, but that is OK. Imran On Thu, Feb 19, 2015 at 4:43 AM, Joe Wass jw...@crossref.org wrote: Thanks for your reply Sean. Looks like it's happening in a map: 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing tasks from Stage 1 (MappedRDD[17] at mapToPair at NativeMethodAccessorImpl.java:-2) That's my initial 'parse' stage, done before repartitioning. It reduces the data size significantly so I thought it would be sensible to do before repartitioning, which involves moving lots of data around. That might be a stupid idea in hindsight! So the obvious thing to try would be to try repartitioning before the map as the first transformation. I would have done that if I could be sure that it would succeed or fail quickly. I'm not entirely clear about the lazy execution of transformations in DAG. It could be that the error is manifesting during the mapToPair, but caused by the earlier read from text file stage. Thanks for pointers to those compression formats. I'll give them a go (although it's not trivial to re-encode 200 GB of data on S3, so if I can get this working reasonably with gzip I'd like to). Any advice about whether this error can be worked round with an early partition? Cheers Joe On 19 February 2015 at 09:51, Sean Owen so...@cloudera.com wrote: gzip and zip are not splittable compression formats; bzip and lzo are. Ideally, use a splittable compression format. Repartitioning is not a great solution since it means a shuffle, typically. This is not necessarily related to how big your partitions are. The question is, when does this happen? what operation? On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass jw...@crossref.org wrote: On the advice of some recent discussions on this list, I thought I would try and consume gz files directly. I'm reading them, doing a preliminary map, then repartitioning, then doing normal spark things. As I understand it, zip files aren't readable in partitions because of the format,
RDD Partition number
Hi All, Could you please help me understanding how Spark defines the number of partitions of the RDDs if not specified? I found the following in the documentation for file loaded from HDFS: *The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks* What is the rule for file loaded from the file systems? For instance, i have a file X replicated on 4 machines. If i load the file X in a RDD how many partitions are defined and why? Thanks for your help on this Alessandro
Re: Implicit ALS with multiple features
It's shown at http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html It's really not different to use. It's suitable when you have count-like data rather than rating-like data. That's what you have here. I am not sure what you mean that you want to add frequency too but no the model is strictly (user,item,strength). However you can merge many signals into a 'strength' score. On Thu, Feb 19, 2015 at 4:40 PM, poiuytrez guilla...@databerries.com wrote: Hello, I would like to use the spark MLlib recommendation filtering library. My goal will be to predict what a user would like to buy based on what he bought before. I read on the spark documentation that Spark supports implicit feedback. However there is not example for this application. Would implicit feedback works on my business case and how? Can ALS accept multiple parameters. Currently I have : (userId,productId,nbPurchased) I would like to another parameter: (userId,productId,nbPurchased,frenquency) Is it possible with ALS? Thank you for your reply -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Implicit-ALS-with-multiple-features-tp21723.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD Partition number
By default you will have (fileSize in Mb / 64) partitions. You can also set the number of partitions when you read in a file with sc.textFile as an optional second parameter. On Thu, Feb 19, 2015 at 8:07 AM Alessandro Lulli lu...@di.unipi.it wrote: Hi All, Could you please help me understanding how Spark defines the number of partitions of the RDDs if not specified? I found the following in the documentation for file loaded from HDFS: *The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks* What is the rule for file loaded from the file systems? For instance, i have a file X replicated on 4 machines. If i load the file X in a RDD how many partitions are defined and why? Thanks for your help on this Alessandro
RE: Spark job fails on cluster but works fine on a single machine
When writing to hdfs Spark will not overwrite existing files or directories. You must either manually delete these or use Java's Hadoop FileSystem class to remove them. Sent with Good (www.good.com) -Original Message- From: Pavel Velikhov [pavel.velik...@gmail.commailto:pavel.velik...@gmail.com] Sent: Thursday, February 19, 2015 11:32 AM Eastern Standard Time To: user@spark.apache.org Subject: Spark job fails on cluster but works fine on a single machine I have a simple Spark job that goes out to Cassandra, runs a pipe and stores results: val sc = new SparkContext(conf) val rdd = sc.cassandraTable(“keyspace, “table) .map(r = r.getInt(“column) + \t + write(get_lemmas(r.getString(tags .pipe(python3 /tmp/scripts_and_models/scripts/run.py) .map(r = convertStr(r) ) .coalesce(1,true) .saveAsTextFile(/tmp/pavel/CassandraPipeTest.txt) //.saveToCassandra(“keyspace, “table, SomeColumns(“id”,data”)) When run on a single machine, everything is fine if I save to an hdfs file or save to Cassandra. When run in cluster neither works: - When saving to file, I get an exception: User class threw exception: Output directory hdfs://hadoop01:54310/tmp/pavel/CassandraPipeTest.txt already exists - When saving to Cassandra, only 4 rows are updated with empty data (I test on a 4-machine Spark cluster) Any hints on how to debug this and where the problem could be? - I delete the hdfs file before running - Would really like the output to hdfs to work, so I can debug - Then it would be nice to save to Cassandra The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: RDD Partition number
bq. *blocks being 64MB by default in HDFS* *In hadoop 2.1+, default block size has been increased.* See https://issues.apache.org/jira/browse/HDFS-4053 Cheers On Thu, Feb 19, 2015 at 8:32 AM, Ted Yu yuzhih...@gmail.com wrote: What file system are you using ? If you use hdfs, the documentation you cited is pretty clear on how partitions are determined. bq. file X replicated on 4 machines I don't think replication factor plays a role w.r.t. partitions. On Thu, Feb 19, 2015 at 8:05 AM, Alessandro Lulli lu...@di.unipi.it wrote: Hi All, Could you please help me understanding how Spark defines the number of partitions of the RDDs if not specified? I found the following in the documentation for file loaded from HDFS: *The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks* What is the rule for file loaded from the file systems? For instance, i have a file X replicated on 4 machines. If i load the file X in a RDD how many partitions are defined and why? Thanks for your help on this Alessandro
RE: RDD Partition number
As Ted Yu points out, default block size is 128MB as of Hadoop 2.1. Sent with Good (www.good.com) -Original Message- From: Ilya Ganelin [ilgan...@gmail.commailto:ilgan...@gmail.com] Sent: Thursday, February 19, 2015 12:13 PM Eastern Standard Time To: Alessandro Lulli; user@spark.apache.org Cc: Massimiliano Bertolucci Subject: Re: RDD Partition number By default you will have (fileSize in Mb / 64) partitions. You can also set the number of partitions when you read in a file with sc.textFile as an optional second parameter. On Thu, Feb 19, 2015 at 8:07 AM Alessandro Lulli lu...@di.unipi.itmailto:lu...@di.unipi.it wrote: Hi All, Could you please help me understanding how Spark defines the number of partitions of the RDDs if not specified? I found the following in the documentation for file loaded from HDFS: The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks What is the rule for file loaded from the file systems? For instance, i have a file X replicated on 4 machines. If i load the file X in a RDD how many partitions are defined and why? Thanks for your help on this Alessandro The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Filtering keys after map+combine
Hi, Before I send out the keys for network shuffle, in reduceByKey after map + combine are done, I would like to filter the keys based on some threshold... Is there a way to get the key, value after map+combine stages so that I can run a filter on the keys ? Thanks. Deb
Re: Unzipping large files and 2GB partition size.
Hi Joe, The issue is not that you have input partitions that are bigger than 2GB -- its just that they are getting cached. You can see in the stack trace, the problem is when you try to read data out of the DiskStore: org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) Also, just because you see this: 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing tasks from Stage 1 (MappedRDD[17] at mapToPair at NativeMethodAccessorImpl.java: -2) it doesn't *necessarily* mean that this is coming from your map. It can be pretty confusing how your operations on RDDs get turned into stages, it could be a lot more than just your map. and actually, it might not even be your map at all -- some of the other operations you invoke call map underneath the covers. So its hard to say what is going on here w/ out seeing more code. Anyway, maybe you've already considered all this (you did mention the lazy execution of the DAG), but I wanted to make sure. it might help to use rdd.setName() and also to look at rdd.toDebugString. As far as what you can do about this -- it could be as simple as moving your rdd.persist() to after you have compressed and repartitioned your data. eg., I'm blindly guessing you have something like this: val rawData = sc.hadoopFile(...) rawData.persist(DISK) rawData.count() val compressedData = rawData.map{...} val repartitionedData = compressedData.repartition(N) ... change it to something like: val rawData = sc.hadoopFile(...) val compressedData = rawData.map{...} val repartitionedData = compressedData.repartition(N) repartitionedData.persist(DISK) repartitionedData.count() ... The point is, you avoid caching any data until you have ensured that the partitions are small. You might have big partitions before that in rawData, but that is OK. Imran On Thu, Feb 19, 2015 at 4:43 AM, Joe Wass jw...@crossref.org wrote: Thanks for your reply Sean. Looks like it's happening in a map: 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing tasks from Stage 1 (MappedRDD[17] at mapToPair at NativeMethodAccessorImpl.java:-2) That's my initial 'parse' stage, done before repartitioning. It reduces the data size significantly so I thought it would be sensible to do before repartitioning, which involves moving lots of data around. That might be a stupid idea in hindsight! So the obvious thing to try would be to try repartitioning before the map as the first transformation. I would have done that if I could be sure that it would succeed or fail quickly. I'm not entirely clear about the lazy execution of transformations in DAG. It could be that the error is manifesting during the mapToPair, but caused by the earlier read from text file stage. Thanks for pointers to those compression formats. I'll give them a go (although it's not trivial to re-encode 200 GB of data on S3, so if I can get this working reasonably with gzip I'd like to). Any advice about whether this error can be worked round with an early partition? Cheers Joe On 19 February 2015 at 09:51, Sean Owen so...@cloudera.com wrote: gzip and zip are not splittable compression formats; bzip and lzo are. Ideally, use a splittable compression format. Repartitioning is not a great solution since it means a shuffle, typically. This is not necessarily related to how big your partitions are. The question is, when does this happen? what operation? On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass jw...@crossref.org wrote: On the advice of some recent discussions on this list, I thought I would try and consume gz files directly. I'm reading them, doing a preliminary map, then repartitioning, then doing normal spark things. As I understand it, zip files aren't readable in partitions because of the format, so I thought that repartitioning would be the next best thing for parallelism. I have about 200 files, some about 1GB compressed and some over 2GB uncompressed. I'm hitting the 2GB maximum partition size. It's been discussed on this list (topic: 2GB limit for partitions?, tickets SPARK-1476 and SPARK-1391). Stack trace at the end. This happened at 10 hours in (probably when it saw its first file). I can't just re-run it quickly! Does anyone have any advice? Might I solve this by re-partitioning as the first step after reading the file(s)? Or is it effectively impossible to read a gz file that expands to over 2GB? Does anyone have any experience with this? Thanks in advance Joe Stack trace: Exception in thread main 15/02/18 20:44:25 INFO scheduler.TaskSetManager: Lost task 5.3 in stage 1.0 (TID 283) on executor: java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE) [duplicate 6] org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage 1.0: java.lang.IllegalArgumentException: Size exceeds
Re: SparkSQL + Tableau Connector
Hi Silvio, I got this working today using your suggestion with the Initial SQL and a Custom Query. See here for details: http://stackoverflow.com/questions/28403664/connect-to-existing-hive-in-intellij-using-sbt-as-build/28608608#28608608 It is not ideal as I need to write a custom query, but does work for now. I also have it working by doing a SaveAsTable on the ingested data which stores the reference into the metastore for access via the thrift server. Thanks for the help. -Todd On Wed, Feb 11, 2015 at 8:41 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: Hey Todd, I don’t have an app to test against the thrift server, are you able to define custom SQL without using Tableau’s schema query? I guess it’s not possible to just use SparkSQL temp tables, you may have to use permanent Hive tables that are actually in the metastore so Tableau can discover them in the schema. In that case you will either have to generate the Hive tables externally from Spark or use Spark to process the data and save them using a HiveContext. From: Todd Nist Date: Wednesday, February 11, 2015 at 7:53 PM To: Andrew Lee Cc: Arush Kharbanda, user@spark.apache.org Subject: Re: SparkSQL + Tableau Connector First sorry for the long post. So back to tableau and Spark SQL, I'm still missing something. TL;DR To get the Spark SQL Temp table associated with the metastore are there additional steps required beyond doing the below? Initial SQL on connection: create temporary table test using org.apache.spark.sql.json options (path '/data/json/*'); cache table test; I feel like I'm missing a step of associating the Spark SQL table with the metastore, do I need to actually save it in some fashion? I'm trying to avoid saving to hive if possible. *Details:* I configured the hive-site.xml and placed it in the $SPARK_HOME/conf. It looks like this, thanks Andrew and Arush for the assistance: ?xml version=1.0? ?xml-stylesheet type=text/xsl href=configuration.xsl? configuration property namehive.semantic.analyzer.factory.impl/name valueorg.apache.hcatalog.cli.HCatSemanticAnalyzerFactory/value /property property namehive.metastore.sasl.enabled/name valuefalse/value /property property namehive.server2.authentication/name valueNONE/value /property property namehive.server2.enable.doAs/name valuetrue/value /property !-- property namehive.metastore.uris/name valuethrift://localhost:9083/value descriptionIP address (or fully-qualified domain name) and port of the metastore host/description /property -- property namehive.warehouse.subdir.inherit.perms/name valuetrue/value /property property namehive.metastore.schema.verification/name valuefalse/value /property property namejavax.jdo.option.ConnectionURL/name valuejdbc:mysql://localhost:3306/metastore_db?createDatabaseIfNotExist=true/value descriptionmetadata is stored in a MySQL server/description /property property namejavax.jdo.option.ConnectionDriverName/name valuecom.mysql.jdbc.Driver/value descriptionMySQL JDBC driver class/description /property property namejavax.jdo.option.ConnectionUserName/name valuehiveuser/value /property property namejavax.jdo.option.ConnectionPassword/name valuehiveuser/value /property /configuration When I start the server it looks fine: $ ./sbin/start-thriftserver.sh --hiveconf hive.server2.thrift.port=10001 --hiveconf hive.server2.thrift.bind.host radtech.io --master spark://radtech.io:7077 --driver-class-path /usr/local/spark/lib/mysql-connector-java-5.1.34-bin.jar starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to /usr/local/spark-1.2.1-bin-hadoop2.4/logs/spark-tnist-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-radtech.io.out radtech:spark tnist$ tail -f logs/spark-tnist-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-radtech.io.out 15/02/11 19:15:24 INFO SparkDeploySchedulerBackend: Granted executor ID app-20150211191524-0008/1 on hostPort 192.168.1.2:50851 with 2 cores, 512.0 MB RAM 15/02/11 19:15:24 INFO AppClient$ClientActor: Executor updated: app-20150211191524-0008/0 is now LOADING 15/02/11 19:15:24 INFO AppClient$ClientActor: Executor updated: app-20150211191524-0008/1 is now LOADING 15/02/11 19:15:24 INFO AppClient$ClientActor: Executor updated: app-20150211191524-0008/0 is now RUNNING 15/02/11 19:15:24 INFO AppClient$ClientActor: Executor updated: app-20150211191524-0008/1 is now RUNNING 15/02/11 19:15:24 INFO NettyBlockTransferService: Server created on 50938 15/02/11 19:15:24 INFO BlockManagerMaster: Trying to register BlockManager 15/02/11 19:15:24 INFO BlockManagerMasterActor: Registering block manager 192.168.1.2:50938 with 265.1 MB RAM, BlockManagerId(driver, 192.168.1.2, 50938)
bulk writing to HDFS in Spark Streaming?
Hi all, In Spark Streaming I want use the Dstream.saveAsTextFiles by bulk writing because of the normal saveAsTextFiles cannot during the batch interval of setting. May be a common pool of writing or another assigned worker for bulk writing? Thanks! B/R Jichao
Re: JdbcRDD, ClassCastException with scala.Function0
Yup, I did see that. Good point though, Cody. The mismatch was happening for me when I was trying to get the 'new JdbcRDD' approach going. Once I switched to the 'create' method things are working just fine. Was just able to refactor the 'get connection' logic into a 'DbConnection implements JdbcRDD.ConnectionFactory' and my 'map row' class is still 'MapRow implements org.apache.spark.api.java.function.FunctionResultSet, Row'. This works fine and makes the driver program tighter. Of course, my next question is, how to work with the lower and upper bound parameters. As in, what if I don't know what the min and max ID values are and just want to extract all data from the table, what should the params be, if that's even supported. And furthermore, what if the primary key on the table is not numeric? or if there's no primary key altogether? The method works fine with lowerBound=0 and upperBound=100, for example. But doesn't seem to have a way to say, 'no upper bound' (-1 didn't work). On Wed, Feb 18, 2015 at 11:59 PM, Cody Koeninger c...@koeninger.org wrote: Look at the definition of JdbcRDD.create: def create[T]( sc: JavaSparkContext, connectionFactory: ConnectionFactory, sql: String, lowerBound: Long, upperBound: Long, numPartitions: Int, mapRow: JFunction[ResultSet, T]): JavaRDD[T] = { JFunction here is the interface org.apache.spark.api.java.function.Function, not scala Function0 LIkewise, ConnectionFactory is an interface defined inside JdbcRDD, not scala Function0 On Wed, Feb 18, 2015 at 4:50 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: That's exactly what I was doing. However, I ran into runtime issues with doing that. For instance, I had a public class DbConnection extends AbstractFunction0Connection implements Serializable I got a runtime error from Spark complaining that DbConnection wasn't an instance of scala.Function0. I also had a public class MapRow extends scala.runtime.AbstractFunction1java.sql.ResultSet, Row implements Serializable with which I seemed to have more luck. On Wed, Feb 18, 2015 at 5:32 PM, Cody Koeninger c...@koeninger.org wrote: Cant you implement the org.apache.spark.api.java.function.Function interface and pass an instance of that to JdbcRDD.create ? On Wed, Feb 18, 2015 at 3:48 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Cody, you were right, I had a copy and paste snag where I ended up with a vanilla SparkContext rather than a Java one. I also had to *not* use my function subclasses, rather just use anonymous inner classes for the Function stuff and that got things working. I'm fully following the JdbcRDD.create approach from JavaJdbcRDDSuite.java basically verbatim. Is there a clean way to refactor out the custom Function classes such as the one for getting a db connection or mapping ResultSet data to your own POJO's rather than doing it all inline? On Wed, Feb 18, 2015 at 1:52 PM, Cody Koeninger c...@koeninger.org wrote: Is sc there a SparkContext or a JavaSparkContext? The compilation error seems to indicate the former, but JdbcRDD.create expects the latter On Wed, Feb 18, 2015 at 12:30 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: I have tried that as well, I get a compile error -- [ERROR] ...SparkProto.java:[105,39] error: no suitable method found for create(SparkContext,anonymous ConnectionFactory,String,int,int,int,anonymous FunctionResultSet,Integer) The code is a copy and paste: JavaRDDInteger jdbcRDD = JdbcRDD.create( sc, new JdbcRDD.ConnectionFactory() { public Connection getConnection() throws SQLException { return DriverManager.getConnection(jdbc:derby:target/JavaJdbcRDDSuiteDb); } }, SELECT DATA FROM FOO WHERE ? = ID AND ID = ?, 1, 100, 1, new FunctionResultSet, Integer() { public Integer call(ResultSet r) throws Exception { return r.getInt(1); } } ); The other thing I've tried was to define a static class locally for GetConnection and use the JdbcCreate constructor. This got around the compile issues but blew up at runtime with NoClassDefFoundError: scala/runtime/AbstractFunction0 ! JdbcRDDRow jdbcRDD = new JdbcRDDRow( sc, (AbstractFunction0Connection) new DbConn(), // had to cast or a compile error SQL_QUERY, 0L, 1000L, 10, new MapRow(), ROW_CLASS_TAG); // DbConn is defined as public static class DbConn extends AbstractFunction0Connection implements Serializable On Wed, Feb 18, 2015 at 1:20 PM, Cody Koeninger c...@koeninger.org wrote: That test I linked https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java#L90 is calling a static method JdbcRDD.create, not new JdbcRDD. Is that what you tried doing? On Wed, Feb 18, 2015 at 12:00 PM, Dmitry
Re: Unzipping large files and 2GB partition size.
oh, I think you are just choosing a number that is too small for your number of partitions. All of the data in /dir/to/gzfiles is going to be sucked into one RDD, with the data divided into partitions. So if you're parsing 200 files, each about 2 GB, and then repartitioning down to 100 partitions, you would expect 4 GB per partition. Though you're filtering the data down some, there may also be some bloat from from your parsed objects. Also if you're not using kryo for serialization, I'd strongly recommend that over the default serialization, and try to register all your classes. I think you can get some information about how much data is in your RDDs from the UI -- but it might depend on what version you are running of spark, plus I think the info isn't saved on failed stages, so you might just need to monitor it in the UI as its happening (I am not 100% sure about that ...) So I'd suggest (a) using a lot more partitions (maybe 1k, given your data size) (b) turn on kryo if you haven't already. On Thu, Feb 19, 2015 at 9:36 AM, Joe Wass jw...@crossref.org wrote: Thanks for your detailed reply Imran. I'm writing this in Clojure (using Flambo which uses the Java API) but I don't think that's relevant. So here's the pseudocode (sorry I've not written Scala for a long time): val rawData = sc.hadoopFile(/dir/to/gzfiles) // NB multiple files. val parsedFiles = rawData.map(parseFunction) // can return nil on failure val filtered = parsedFiles.filter(notNil) val partitioned = filtered.repartition(100) // guessed number val persisted = partitioned.persist(StorageLevels.DISK_ONLY) val resultA = stuffA(persisted) val resultB = stuffB(persisted) val resultC = stuffC(persisted) So, I think I'm already doing what you suggested. I would have assumed that partition size would be («size of expanded file» / «number of partitions»). In this case, 100 (which I picked out of the air). I wonder whether the «size of expanded file» is actually the size of all concatenated input files (probably about 800 GB)? In that case should I multiply it by the number of files? Or perhaps I'm barking up completely the wrong tree. Joe On 19 February 2015 at 14:44, Imran Rashid iras...@cloudera.com wrote: Hi Joe, The issue is not that you have input partitions that are bigger than 2GB -- its just that they are getting cached. You can see in the stack trace, the problem is when you try to read data out of the DiskStore: org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) Also, just because you see this: 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing tasks from Stage 1 (MappedRDD[17] at mapToPair at NativeMethodAccessorImpl.java:-2) it doesn't *necessarily* mean that this is coming from your map. It can be pretty confusing how your operations on RDDs get turned into stages, it could be a lot more than just your map. and actually, it might not even be your map at all -- some of the other operations you invoke call map underneath the covers. So its hard to say what is going on here w/ out seeing more code. Anyway, maybe you've already considered all this (you did mention the lazy execution of the DAG), but I wanted to make sure. it might help to use rdd.setName() and also to look at rdd.toDebugString. As far as what you can do about this -- it could be as simple as moving your rdd.persist() to after you have compressed and repartitioned your data. eg., I'm blindly guessing you have something like this: val rawData = sc.hadoopFile(...) rawData.persist(DISK) rawData.count() val compressedData = rawData.map{...} val repartitionedData = compressedData.repartition(N) ... change it to something like: val rawData = sc.hadoopFile(...) val compressedData = rawData.map{...} val repartitionedData = compressedData.repartition(N) repartitionedData.persist(DISK) repartitionedData.count() ... The point is, you avoid caching any data until you have ensured that the partitions are small. You might have big partitions before that in rawData, but that is OK. Imran On Thu, Feb 19, 2015 at 4:43 AM, Joe Wass jw...@crossref.org wrote: Thanks for your reply Sean. Looks like it's happening in a map: 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing tasks from Stage 1 (MappedRDD[17] at mapToPair at NativeMethodAccessorImpl.java:-2) That's my initial 'parse' stage, done before repartitioning. It reduces the data size significantly so I thought it would be sensible to do before repartitioning, which involves moving lots of data around. That might be a stupid idea in hindsight! So the obvious thing to try would be to try repartitioning before the map as the first transformation. I would have done that if I could be sure that it would succeed or fail quickly. I'm not entirely clear about the lazy execution of transformations in DAG. It could be that the error is manifesting during the mapToPair, but
Re: Filtering keys after map+combine
I thought combiner comes from reduceByKey and not mapPartitions right...Let me dig deeper into the APIs On Thu, Feb 19, 2015 at 8:29 AM, Daniel Siegmann daniel.siegm...@velos.io wrote: I'm not sure what your use case is, but perhaps you could use mapPartitions to reduce across the individual partitions and apply your filtering. Then you can finish with a reduceByKey. On Thu, Feb 19, 2015 at 9:21 AM, Debasish Das debasish.da...@gmail.com wrote: Hi, Before I send out the keys for network shuffle, in reduceByKey after map + combine are done, I would like to filter the keys based on some threshold... Is there a way to get the key, value after map+combine stages so that I can run a filter on the keys ? Thanks. Deb -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 54 W 40th St, New York, NY 10018 E: daniel.siegm...@velos.io W: www.velos.io
Re: JdbcRDD, ClassCastException with scala.Function0
That's a good point, thanks. Is there a way to instrument continuous realtime streaming of data out of a database? If the data keeps changing, one way to implement extraction would be to keep track of something like the last-modified timestamp and instrument the query to be 'where lastmodified ?' That would imply running the spark program repetitively on a scheduled basis. I wonder if it's possible to just continuously stream any updates out instead, using Spark.. On Thu, Feb 19, 2015 at 10:23 AM, Cody Koeninger c...@koeninger.org wrote: At the beginning of the code, do a query to find the current maximum ID Don't just put in an arbitrarily large value, or all of your rows will end up in 1 spark partition at the beginning of the range. The question of keys is up to you... all that you need to be able to do is write a sql statement that takes 2 numbers to specify the bounds. Of course, a numeric primary key is going to be the most efficient way to do that. On Thu, Feb 19, 2015 at 8:57 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Yup, I did see that. Good point though, Cody. The mismatch was happening for me when I was trying to get the 'new JdbcRDD' approach going. Once I switched to the 'create' method things are working just fine. Was just able to refactor the 'get connection' logic into a 'DbConnection implements JdbcRDD.ConnectionFactory' and my 'map row' class is still 'MapRow implements org.apache.spark.api.java.function.FunctionResultSet, Row'. This works fine and makes the driver program tighter. Of course, my next question is, how to work with the lower and upper bound parameters. As in, what if I don't know what the min and max ID values are and just want to extract all data from the table, what should the params be, if that's even supported. And furthermore, what if the primary key on the table is not numeric? or if there's no primary key altogether? The method works fine with lowerBound=0 and upperBound=100, for example. But doesn't seem to have a way to say, 'no upper bound' (-1 didn't work). On Wed, Feb 18, 2015 at 11:59 PM, Cody Koeninger c...@koeninger.org wrote: Look at the definition of JdbcRDD.create: def create[T]( sc: JavaSparkContext, connectionFactory: ConnectionFactory, sql: String, lowerBound: Long, upperBound: Long, numPartitions: Int, mapRow: JFunction[ResultSet, T]): JavaRDD[T] = { JFunction here is the interface org.apache.spark.api.java.function.Function, not scala Function0 LIkewise, ConnectionFactory is an interface defined inside JdbcRDD, not scala Function0 On Wed, Feb 18, 2015 at 4:50 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: That's exactly what I was doing. However, I ran into runtime issues with doing that. For instance, I had a public class DbConnection extends AbstractFunction0Connection implements Serializable I got a runtime error from Spark complaining that DbConnection wasn't an instance of scala.Function0. I also had a public class MapRow extends scala.runtime.AbstractFunction1java.sql.ResultSet, Row implements Serializable with which I seemed to have more luck. On Wed, Feb 18, 2015 at 5:32 PM, Cody Koeninger c...@koeninger.org wrote: Cant you implement the org.apache.spark.api.java.function.Function interface and pass an instance of that to JdbcRDD.create ? On Wed, Feb 18, 2015 at 3:48 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Cody, you were right, I had a copy and paste snag where I ended up with a vanilla SparkContext rather than a Java one. I also had to *not* use my function subclasses, rather just use anonymous inner classes for the Function stuff and that got things working. I'm fully following the JdbcRDD.create approach from JavaJdbcRDDSuite.java basically verbatim. Is there a clean way to refactor out the custom Function classes such as the one for getting a db connection or mapping ResultSet data to your own POJO's rather than doing it all inline? On Wed, Feb 18, 2015 at 1:52 PM, Cody Koeninger c...@koeninger.org wrote: Is sc there a SparkContext or a JavaSparkContext? The compilation error seems to indicate the former, but JdbcRDD.create expects the latter On Wed, Feb 18, 2015 at 12:30 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: I have tried that as well, I get a compile error -- [ERROR] ...SparkProto.java:[105,39] error: no suitable method found for create(SparkContext,anonymous ConnectionFactory,String,int,int,int,anonymous FunctionResultSet,Integer) The code is a copy and paste: JavaRDDInteger jdbcRDD = JdbcRDD.create( sc, new JdbcRDD.ConnectionFactory() { public Connection getConnection() throws SQLException { return DriverManager.getConnection(jdbc:derby:target/JavaJdbcRDDSuiteDb); } }, SELECT DATA FROM FOO WHERE ? = ID AND ID =
Re: Some tasks taking too much time to complete in a stage
almost all your data is going to one task. You can see that the shuffle read for task 0 is 153.3 KB, and for most other tasks its just 26B (which is probably just some header saying there are no actual records). You need to ensure your data is more evenly distributed before this step. On Thu, Feb 19, 2015 at 10:53 AM, jatinpreet jatinpr...@gmail.com wrote: Hi, I am running Spark 1.2.1 for compute intensive jobs comprising of multiple tasks. I have observed that most tasks complete very quickly, but there are always one or two tasks that take a lot of time to complete thereby increasing the overall stage time. What could be the reason for this? Following are the statistics for one such stage. As you can see, the task with index 0 takes 1.1 minutes whereas others completed much more quickly. Aggregated Metrics by Executor Executor ID Address Task Time Total Tasks Failed TasksSucceeded Tasks Input Output Shuffle ReadShuffle Write Shuffle Spill (Memory) Shuffle Spill (Disk) 0 slave1:5631146 s13 0 13 0.0 B 0.0 B 0.0 B 0.0 B 0.0 B 0.0 B 1 slave2:426482.1 min 13 0 13 0.0 B 0.0 B 384.3 KB0.0 B 0.0 B 0.0 B 2 slave3:4432223 s12 0 12 0.0 B 0.0 B 136.4 KB0.0 B 0.0 B 0.0 B 3 slave4:3798744 s12 0 12 0.0 B 0.0 B 213.9 KB0.0 B 0.0 B 0.0 B Tasks Index ID Attempt Status Locality Level Executor ID / Host Launch Time DurationGC Time Shuffle ReadErrors 0 213 0 SUCCESS PROCESS_LOCAL 1 / slave2 2015/02/19 11:40:05 1.1 min 1 s 153.3 KB 5 218 0 SUCCESS PROCESS_LOCAL 3 / slave4 2015/02/19 11:40:05 23 ms 26.0 B 1 214 0 SUCCESS PROCESS_LOCAL 3 / slave4 2015/02/19 11:40:05 2 s 0.9 s 13.8 KB 4 217 0 SUCCESS PROCESS_LOCAL 1 / slave2 2015/02/19 11:40:05 26 ms 26.0 B 3 216 0 SUCCESS PROCESS_LOCAL 0 / slave1 2015/02/19 11:40:05 11 ms 0.0 B 2 215 0 SUCCESS PROCESS_LOCAL 2 / slave3 2015/02/19 11:40:05 27 ms 26.0 B 7 220 0 SUCCESS PROCESS_LOCAL 0 / slave1 2015/02/19 11:40:05 11 ms 0.0 B 10 223 0 SUCCESS PROCESS_LOCAL 2 / slave3 2015/02/19 11:40:05 23 ms 26.0 B 6 219 0 SUCCESS PROCESS_LOCAL 2 / slave3 2015/02/19 11:40:05 23 ms 26.0 B 9 222 0 SUCCESS PROCESS_LOCAL 3 / slave4 2015/02/19 11:40:05 23 ms 26.0 B 8 221 0 SUCCESS PROCESS_LOCAL 1 / slave2 2015/02/19 11:40:05 23 ms 26.0 B 11 224 0 SUCCESS PROCESS_LOCAL 0 / slave1 2015/02/19 11:40:05 10 ms 0.0 B 14 227 0 SUCCESS PROCESS_LOCAL 2 / slave3 2015/02/19 11:40:05 24 ms 26.0 B 13 226 0 SUCCESS PROCESS_LOCAL 3 / slave4 2015/02/19 11:40:05 23 ms 26.0 B 16 229 0 SUCCESS PROCESS_LOCAL 1 / slave2 2015/02/19 11:40:05 22 ms 26.0 B 12 225 0 SUCCESS PROCESS_LOCAL 1 / slave2 2015/02/19 11:40:05 22 ms 26.0 B 15 228 0 SUCCESS PROCESS_LOCAL 0 / slave1 2015/02/19 11:40:05 10 ms 0.0 B 17 230 0 SUCCESS PROCESS_LOCAL 3 / slave4 2015/02/19 11:40:05 22 ms 26.0 B 23 236 0 SUCCESS PROCESS_LOCAL 0 / slave1 2015/02/19 11:40:05 10 ms 0.0 B 22 235 0 SUCCESS PROCESS_LOCAL 2 / slave3 2015/02/19 11:40:05 21 ms 26.0 B 19 232 0 SUCCESS PROCESS_LOCAL 0 / slave1 2015/02/19 11:40:05 10 ms 0.0 B 21 234 0 SUCCESS PROCESS_LOCAL 3 / slave4 2015/02/19 11:40:05 25 ms 26.0 B 18 231 0 SUCCESS PROCESS_LOCAL 2 / slave3 2015/02/19 11:40:05 24 ms 26.0 B 20 233 0 SUCCESS PROCESS_LOCAL 1 / slave2 2015/02/19 11:40:05 28 ms 26.0 B 25 238 0 SUCCESS PROCESS_LOCAL 3 / slave4 2015/02/19 11:40:05 20 ms 26.0 B 28 241 0 SUCCESS PROCESS_LOCAL 1 / slave2 2015/02/19 11:40:05 27 ms 26.0 B 27 240 0 SUCCESS PROCESS_LOCAL 0 / slave1 2015/02/19 11:40:05 10 ms 0.0 B Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Some-tasks-taking-too-much-time-to-complete-in-a-stage-tp21724.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Some tasks taking too much time to complete in a stage
Hi Imran, Thanks for pointing that out. My data comes from the HBase connector of Spark. I do not govern the distribution of data myself. HBase decides to put the data on any of the region servers. Is there a way to distribute data evenly? And I am especially interested in running even small loads very quickly apart from bulk loads. Thanks, Jatin On Thu, Feb 19, 2015 at 10:28 PM, Imran Rashid iras...@cloudera.com wrote: almost all your data is going to one task. You can see that the shuffle read for task 0 is 153.3 KB, and for most other tasks its just 26B (which is probably just some header saying there are no actual records). You need to ensure your data is more evenly distributed before this step. On Thu, Feb 19, 2015 at 10:53 AM, jatinpreet jatinpr...@gmail.com wrote: Hi, I am running Spark 1.2.1 for compute intensive jobs comprising of multiple tasks. I have observed that most tasks complete very quickly, but there are always one or two tasks that take a lot of time to complete thereby increasing the overall stage time. What could be the reason for this? Following are the statistics for one such stage. As you can see, the task with index 0 takes 1.1 minutes whereas others completed much more quickly. Aggregated Metrics by Executor Executor ID Address Task Time Total Tasks Failed TasksSucceeded Tasks Input Output Shuffle ReadShuffle Write Shuffle Spill (Memory) Shuffle Spill (Disk) 0 slave1:5631146 s13 0 13 0.0 B 0.0 B 0.0 B 0.0 B 0.0 B 0.0 B 1 slave2:426482.1 min 13 0 13 0.0 B 0.0 B 384.3 KB0.0 B 0.0 B 0.0 B 2 slave3:4432223 s12 0 12 0.0 B 0.0 B 136.4 KB0.0 B 0.0 B 0.0 B 3 slave4:3798744 s12 0 12 0.0 B 0.0 B 213.9 KB0.0 B 0.0 B 0.0 B Tasks Index ID Attempt Status Locality Level Executor ID / Host Launch Time DurationGC Time Shuffle ReadErrors 0 213 0 SUCCESS PROCESS_LOCAL 1 / slave2 2015/02/19 11:40:05 1.1 min 1 s 153.3 KB 5 218 0 SUCCESS PROCESS_LOCAL 3 / slave4 2015/02/19 11:40:05 23 ms 26.0 B 1 214 0 SUCCESS PROCESS_LOCAL 3 / slave4 2015/02/19 11:40:05 2 s 0.9 s 13.8 KB 4 217 0 SUCCESS PROCESS_LOCAL 1 / slave2 2015/02/19 11:40:05 26 ms 26.0 B 3 216 0 SUCCESS PROCESS_LOCAL 0 / slave1 2015/02/19 11:40:05 11 ms 0.0 B 2 215 0 SUCCESS PROCESS_LOCAL 2 / slave3 2015/02/19 11:40:05 27 ms 26.0 B 7 220 0 SUCCESS PROCESS_LOCAL 0 / slave1 2015/02/19 11:40:05 11 ms 0.0 B 10 223 0 SUCCESS PROCESS_LOCAL 2 / slave3 2015/02/19 11:40:05 23 ms 26.0 B 6 219 0 SUCCESS PROCESS_LOCAL 2 / slave3 2015/02/19 11:40:05 23 ms 26.0 B 9 222 0 SUCCESS PROCESS_LOCAL 3 / slave4 2015/02/19 11:40:05 23 ms 26.0 B 8 221 0 SUCCESS PROCESS_LOCAL 1 / slave2 2015/02/19 11:40:05 23 ms 26.0 B 11 224 0 SUCCESS PROCESS_LOCAL 0 / slave1 2015/02/19 11:40:05 10 ms 0.0 B 14 227 0 SUCCESS PROCESS_LOCAL 2 / slave3 2015/02/19 11:40:05 24 ms 26.0 B 13 226 0 SUCCESS PROCESS_LOCAL 3 / slave4 2015/02/19 11:40:05 23 ms 26.0 B 16 229 0 SUCCESS PROCESS_LOCAL 1 / slave2 2015/02/19 11:40:05 22 ms 26.0 B 12 225 0 SUCCESS PROCESS_LOCAL 1 / slave2 2015/02/19 11:40:05 22 ms 26.0 B 15 228 0 SUCCESS PROCESS_LOCAL 0 / slave1 2015/02/19 11:40:05 10 ms 0.0 B 17 230 0 SUCCESS PROCESS_LOCAL 3 / slave4 2015/02/19 11:40:05 22 ms 26.0 B 23 236 0 SUCCESS PROCESS_LOCAL 0 / slave1 2015/02/19 11:40:05 10 ms 0.0 B 22 235 0 SUCCESS PROCESS_LOCAL 2 / slave3 2015/02/19 11:40:05 21 ms 26.0 B 19 232 0 SUCCESS PROCESS_LOCAL 0 / slave1 2015/02/19 11:40:05 10 ms 0.0 B 21 234 0 SUCCESS PROCESS_LOCAL 3 / slave4 2015/02/19 11:40:05 25 ms 26.0 B 18 231 0 SUCCESS PROCESS_LOCAL 2 / slave3 2015/02/19 11:40:05 24 ms 26.0 B 20 233 0 SUCCESS PROCESS_LOCAL 1 / slave2 2015/02/19 11:40:05 28 ms 26.0 B 25 238 0 SUCCESS PROCESS_LOCAL 3 / slave4 2015/02/19 11:40:05 20 ms 26.0 B 28 241 0 SUCCESS PROCESS_LOCAL 1 / slave2 2015/02/19 11:40:05 27 ms 26.0 B 27 240 0 SUCCESS PROCESS_LOCAL 0 / slave1 2015/02/19 11:40:05 10 ms 0.0 B Thanks -- View this
Re: Why is RDD lookup slow?
If your dataset is large, there is a Spark Package called IndexedRDD optimized for lookups. Feel free to check that out. Burak On Feb 19, 2015 7:37 AM, Ilya Ganelin ilgan...@gmail.com wrote: Hi Shahab - if your data structures are small enough a broadcasted Map is going to provide faster lookup. Lookup within an RDD is an O(m) operation where m is the size of the partition. For RDDs with multiple partitions, executors can operate on it in parallel so you get some improvement for larger RDDs. On Thu, Feb 19, 2015 at 7:31 AM shahab shahab.mok...@gmail.com wrote: Hi, I am doing lookup on cached RDDs [(Int,String)], and I noticed that the lookup is relatively slow 30-100 ms ?? I even tried this on one machine with single partition, but no difference! The RDDs are not large at all, 3-30 MB. Is this expected behaviour? should I use other data structures, like HashMap to keep data and look up it there and use Broadcast to send a copy to all machines? best, /Shahab
SchemaRDD.select
I am trying to pass a variable number of arguments to the select function of a SchemaRDD I created, as I want to select the fields in run time: val variable_argument_list = List('field1,'field2') val schm1 = myschemaRDD.select('field1,'field2) // works val schm2 = myschemaRDD.select(variable_argument_list:_*) // do not work I am interested in selecting in run time the fields from myschemaRDD variable. However, the usual way of passing variable number of arguments as a List in Scala fails. Is there a way of selecting a variable number of arguments in the select function? If not, what will be a better approach for selecting the required fields in run time? Thanks in advance for your help -- Cesar Flores
Re: percentil UDAF in spark 1.2.0
Isnt that PR about being able to pass in an array to percentile function. If I understand this error correctly, its not able to find the function percentile itself. Also, if I am incorrect and that PR fixes it, is it available in a release ? On Thu, Feb 19, 2015 at 3:27 PM, Mark Hamstra m...@clearstorydata.com wrote: Already fixed: https://github.com/apache/spark/pull/2802 On Thu, Feb 19, 2015 at 3:17 PM, Mohnish Kodnani mohnish.kodn...@gmail.com wrote: Hi, I am trying to use percentile and getting the following error. I am using spark 1.2.0. Does UDAF percentile exist in that code line and do i have to do something to get this to work. java.util.NoSuchElementException: key not found: percentile at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:53) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$2.applyOrElse(Analyzer.scala:220) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$2.applyOrElse(Analyzer.scala:218) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) Thanks mohnish
unsubscribe
-- cheers, chaitu
Re: issue Running Spark Job on Yarn Cluster
Yes. On 19 Feb 2015 23:40, Harshvardhan Chauhan ha...@gumgum.com wrote: Is this the full stack trace ? On Wed, Feb 18, 2015 at 2:39 AM, sachin Singh sachin.sha...@gmail.com wrote: Hi, I want to run my spark Job in Hadoop yarn Cluster mode, I am using below command - spark-submit --master yarn-cluster --driver-memory 1g --executor-memory 1g --executor-cores 1 --class com.dc.analysis.jobs.AggregationJob sparkanalitic.jar param1 param2 param3 I am getting error as under, kindly suggest whats going wrong ,is command is proper or not ,thanks in advance, Exception in thread main org.apache.spark.SparkException: Application finished with failed status at org.apache.spark.deploy.yarn.ClientBase$class.run(ClientBase.scala:509) at org.apache.spark.deploy.yarn.Client.run(Client.scala:35) at org.apache.spark.deploy.yarn.Client$.main(Client.scala:139) at org.apache.spark.deploy.yarn.Client.main(Client.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/issue-Running-Spark-Job-on-Yarn-Cluster-tp21697.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- *Harshvardhan Chauhan* | Software Engineer *GumGum* http://www.gumgum.com/ | *Ads that stick* 310-260-9666 | ha...@gumgum.com
using a database connection pool to write data into an RDBMS from a Spark application
Hi – I am trying to use BoneCP (a database connection pooling library) to write data from my Spark application to an RDBMS. The database inserts are inside a foreachPartition code block. I am getting this exception when the code tries to insert data using BoneCP: java.sql.SQLException: No suitable driver found for jdbc:postgresql://hostname:5432/dbname I tried explicitly loading the Postgres driver on the worker nodes by adding the following line inside the foreachPartition code block: Class.forName(org.postgresql.Driver) It didn’t help. Has anybody able to get a database connection pool library to work with Spark? If you got it working, can you please share the steps? Thanks, Mohammed
Re: Incorrect number of records after left outer join (I think)
if you have duplicate values for a key, join creates all pairs. Eg. if you 2 values for key X in rdd A 2 values for key X in rdd B, then a.join(B) will have 4 records for key X On Thu, Feb 19, 2015 at 3:39 PM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: Consider the following left outer join potentialDailyModificationsRDD = reducedDailyPairRDD.leftOuterJoin(baselinePairRDD).partitionBy(new HashPartitioner(1024)).persist(StorageLevel.MEMORY_AND_DISK_SER()); Below are the record counts for the RDDs involved Number of records for reducedDailyPairRDD: 2565206 Number of records for baselinePairRDD: 56102812 Number of records for potentialDailyModificationsRDD: 2570115 Below are the partitioners for the RDDs involved. Partitioner for reducedDailyPairRDD: Some(org.apache.spark.HashPartitioner@400) Partitioner for baselinePairRDD: Some(org.apache.spark.HashPartitioner@400 ) Partitioner for potentialDailyModificationsRDD: Some(org.apache.spark.HashPartitioner@400) I realize in the above statement that the .partitionBy is probably not needed as the underlying RDDs used in the left outer join are already hash partitioned. My question is how the resulting RDD (potentialDailyModificationsRDD) can end up with more records than reducedDailyPairRDD. I would think the number of records in potentialDailyModificationsRDD should be 2565206 instead of 2570115. Am I missing something or is this possibly a bug? I'm using Apache Spark 1.2 on a stand-alone cluster on ec2. To get the counts for the records, I'm using the .count() for the RDD. Thanks. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
percentil UDAF in spark 1.2.0
Hi, I am trying to use percentile and getting the following error. I am using spark 1.2.0. Does UDAF percentile exist in that code line and do i have to do something to get this to work. java.util.NoSuchElementException: key not found: percentile at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:53) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$2.applyOrElse(Analyzer.scala:220) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$2.applyOrElse(Analyzer.scala:218) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) Thanks mohnish
How to diagnose could not compute split errors and failed jobs?
My streaming app runs fine for a few hours and then starts spewing Could not compute split, block input-xx-xxx not found errors. After this, jobs start to fail and batches start to pile up. My question isn't so much about why this error but rather, how do I trace what leads to this error? I am using disk+memory for storage so shouldn't be a case of data loss resulting from memory overrun. 15/02/18 22:04:49 ERROR JobScheduler: Error running job streaming job 142429705 ms.28 org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 247644.0 failed 64 times, most recent failure: Lost task 3.63 in stage 247644.0 (TID 3705290, node-dn1-16-test.abcdefg.com): java.lang.Exception: Could not compute split, block input-28-1424297042500 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Thanks, Tim
Re: Filter data from one RDD based on data from another RDD
the more scalable alternative is to do a join (or a variant like cogroup, leftOuterJoin, subtractByKey etc. found in PairRDDFunctions) the downside is this requires a shuffle of both your RDDs On Thu, Feb 19, 2015 at 3:36 PM, Himanish Kushary himan...@gmail.com wrote: Hi, I have two RDD's with csv data as below : RDD-1 101970_5854301840,fbcf5485-e696-4100-9468-a17ec7c5bb43,19229261643 101970_5854301839,fbaf5485-e696-4100-9468-a17ec7c5bb39,9229261645 101970_5854301839,fbbf5485-e696-4100-9468-a17ec7c5bb39,9229261647 101970_17038953,546853f9-cf07-4700-b202-00f21e7c56d8,791191603 101970_5854301840,fbcf5485-e696-4100-9468-a17ec7c5bb42,19229261643 101970_5851048323,218f5485-e58c-4200-a473-348ddb858578,290542385 101970_5854301839,fbcf5485-e696-4100-9468-a17ec7c5bb41,922926164 RDD-2 101970_17038953,546853f9-cf07-4700-b202-00f21e7c56d9,7911160 101970_5851048323,218f5485-e58c-4200-a473-348ddb858578,2954238 101970_5854301839,fbaf5485-e696-4100-9468-a17ec7c5bb39,9226164 101970_5854301839,fbbf5485-e696-4100-9468-a17ec7c5bb39,92292164 101970_5854301839,fbcf5485-e696-4100-9468-a17ec7c5bb41,9226164 101970_5854301838,fbcf5485-e696-4100-9468-a17ec7c5bb40,929164 101970_5854301838,fbcf5485-e696-4100-9468-a17ec7c5bb39,26164 I need to filter RDD-2 to include only those records where the first column value in RDD-2 matches any of the first column values in RDD-1 Currently , I am broadcasting the first column values from RDD-1 as a list and then filtering RDD-2 based on that list. val rdd1broadcast = sc.broadcast(rdd1.map { uu = uu.split(,)(0) }.collect().toSet) val rdd2filtered = rdd2.filter{ h = rdd1broadcast.value.contains(h.split(,)(0)) } This will result in data with first column 101970_5854301838 (last two records) to be filtered out from RDD-2. Is this is the best way to accomplish this ? I am worried that for large data volume , the broadcast step may become an issue. Appreciate any other suggestion. --- Thanks Himanish
Failure on a Pipe operation
Hi, I'm trying to figure out why the following job is failing on a pipe http://pastebin.com/raw.php?i=U5E8YiNN With this exception: http://pastebin.com/raw.php?i=07NTGyPP Any help is welcome. Thank you.
Re: Failure on a Pipe operation
The error msg is telling you the exact problem, it can't find ProgramSIM, the thing you are trying to run Lost task 3520.3 in stage 0.0 (TID 11, compute3.research.dev): java.io.IOException: Cannot run program ProgramSIM: error=2, No s\ uch file or directory On Thu, Feb 19, 2015 at 5:52 PM, athing goingon athinggoin...@gmail.com wrote: Hi, I'm trying to figure out why the following job is failing on a pipe http://pastebin.com/raw.php?i=U5E8YiNN With this exception: http://pastebin.com/raw.php?i=07NTGyPP Any help is welcome. Thank you.
Re: Spark job fails on cluster but works fine on a single machine
The stupid question is whether you're deleting the file from hdfs on the right node? On Thu, Feb 19, 2015 at 11:31 AM Pavel Velikhov pavel.velik...@gmail.com wrote: Yeah, I do manually delete the files, but it still fails with this error. On Feb 19, 2015, at 8:16 PM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: When writing to hdfs Spark will not overwrite existing files or directories. You must either manually delete these or use Java's Hadoop FileSystem class to remove them. Sent with Good (www.good.com) -Original Message- *From: *Pavel Velikhov [pavel.velik...@gmail.com] *Sent: *Thursday, February 19, 2015 11:32 AM Eastern Standard Time *To: *user@spark.apache.org *Subject: *Spark job fails on cluster but works fine on a single machine I have a simple Spark job that goes out to Cassandra, runs a pipe and stores results: val sc = new SparkContext(conf) val rdd = sc.cassandraTable(“keyspace, “table) .map(r = r.getInt(“column) + \t + write(get_lemmas(r.getString(tags .pipe(python3 /tmp/scripts_and_models/scripts/run.py) .map(r = convertStr(r) ) .coalesce(1,true) .saveAsTextFile(/tmp/pavel/CassandraPipeTest.txt) //.saveToCassandra(“keyspace, “table, SomeColumns(“id”,data”)) When run on a single machine, everything is fine if I save to an hdfs file or save to Cassandra. When run in cluster neither works: - When saving to file, I get an exception: User class threw exception: Output directory hdfs://hadoop01:54310/tmp/pavel/CassandraPipeTest.txt already exists - When saving to Cassandra, only 4 rows are updated with empty data (I test on a 4-machine Spark cluster) Any hints on how to debug this and where the problem could be? - I delete the hdfs file before running - Would really like the output to hdfs to work, so I can debug - Then it would be nice to save to Cassandra -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.