Re: jar changed on src filesystem
Since you are running in yarn-cluster mode, and you are supply the spark assembly jar file. There is no need to install spark on each node. Is it possible two spark jars have different version ? Chester Sent from my iPad On Jul 16, 2014, at 22:49, cmti95035 cmti95...@gmail.com wrote: Hi, I need some help for running Spark over Yarn: I set up a cluster running HDP 2.0.6 with 6 nodes, and then installed the spark-1.0.1-bin-hadoop2 on each node. When running the SparkPi example with the following command: ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster --num-executors 5 --driver-memory 4g --executor-memory 2g --executor-cores 1 --jars lib/spark-assembly-1.0.1-hadoop2.2.0.jar lib/spark-examples*.jar 10 The job failed with the following error: INFO yarn.Client: Application report from ASM: application identifier: application_1405545630872_0023 appId: 23 clientToAMToken: null appDiagnostics: Application application_1405545630872_0023 failed 2 times due to AM Container for appattempt_1405545630872_0023_02 exited with exitCode: -1000 due to: Resource hdfs://ip-172-31-9-187.us-west-1.compute.internal:8020/user/hdfs/.sparkStaging/application_1405545630872_0023/spark-assembly-1.0.1-hadoop2.2.0.jar changed on src filesystem (expected 1405574940411, was 1405574941940 .Failing this attempt.. Failing the application. I searched online for solutions and tried to sync up ntp but it doesn't seem to work. Can someone help? Your help is highly appreciated! Thanks, Jian -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/jar-changed-on-src-filesystem-tp10011.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Using RDD in RDD transformation
I implemented a simple KNN classifier. And i can run it successfully on a single sample, but it occurs an error when it is run on a test samples RDD. I attach the source code in attachment. Look forward for you replay! Best wishes to you! The following is source code. import math from pyspark import SparkContext from pyspark.mllib.regression import LabeledPoint from pyspark.mllib._common import _dot class KNN(object): def __init__(self, data, k): ''' data: RDD of LabeledPoint ''' self._data = data self._k = k self._data.cache() def predict(self, x): topksamples = self._data.map(lambda point:(_dot(point.features, x)/math.sqrt(_dot(point.features,x)*_dot(point.features,x)), point.label)).sortByKey(False).top(self._k) labeldict = {} for score,label in topksamples: labeldict.setdefault(label, 0) labeldict[label] += 1 label = sorted([(label,count) for label,count in labeldict.items()], key=lambda x:x[1], reverse=True)[0][0] return label # Load and parse the data def parsePoint(line): values = [float(x) for x in line.split(' ')] return LabeledPoint(values[0], values[1:]) sc = SparkContext(appName=PythonLR) data = sc.textFile(file:///home/hadoop/spark/lr_data.txt) parsedData = data.map(parsePoint) # Build the model model = KNN(parsedData, 20) # Evaluating a single sample on training data print 'Predict lable is: %s ' % model.predict(parsedData.first().features) # Evaluating a samples set which is represented as a RDD testData = sc.textFile(file:///home/hadoop/spark/lr_data.txt) testData = testData.map(parsePoint) labelsAndPreds = testData.map(lambda p: model.predict(p.features)) print labelsAndPreds.collect() -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-RDD-in-RDD-transformation-tp10014.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: can we insert and update with spark sql
Is this what you are looking for? https://spark.apache.org/docs/1.0.0/api/java/org/apache/spark/sql/parquet/InsertIntoParquetTable.html According to the doc, it says Operator that acts as a sink for queries on RDDs and can be used to store the output inside a directory of Parquet files. This operator is similar to Hive's INSERT INTO TABLE operation in the sense that one can choose to either overwrite or append to a directory. Note that consecutive insertions to the same table must have compatible (source) schemas. Thanks Best Regards On Thu, Jul 17, 2014 at 11:42 AM, Hu, Leo leo.h...@sap.com wrote: Hi As for spark 1.0, can we insert and update a table with SPARK SQL, and how? Thanks Best Regard
Re: jar changed on src filesystem
They're all the same version. Actually even without the --jars parameter it got the same error. Looks like it needs to copy the assembly jar for running the example jar anyway during the staging. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/jar-changed-on-src-filesystem-tp10011p10017.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Errors accessing hdfs while in local mode
You can try the following in the spark-shell: 1. Run it in *Clustermode* by going inside the spark directory: $ SPARK_MASTER=spark://masterip:7077 ./bin/spark-shell val textFile = sc.textFile(hdfs://masterip/data/blah.csv) textFile.take(10).foreach(println) 2. Now try running in *Localmode:* $ SPARK_MASTER=local ./bin/spark-shell val textFile = sc.textFile(hdfs://masterip/data/blah.csv) textFile.take(10).foreach(println) Both should print the first 10 lines from your blah.csv file.
preservesPartitioning
Hi All, The function *mapPartitions *in RDD.scala https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala takes a boolean parameter *preservesPartitioning. *It seems if that parameter is passed as *false*, the passed function f will operate on the data only once, whereas if it's passed as *true *the function will operate on each partition of the data. In my case, whatever boolean value I pass, *f* operates on each partition of data. Any help, regarding why I am getting this unexpected behaviour?
Re: preservesPartitioning
Hi Kamal, This is not what preservesPartitioning does -- actually what it means is that if the RDD has a Partitioner set (which means it's an RDD of key-value pairs and the keys are grouped into a known way, e.g. hashed or range-partitioned), your map function is not changing the partition of keys. This lets the job scheduler know that downstream operations, like joins or reduceByKey, can be optimized assuming that all the data for a given partition is located on the same machine. In both cases though, your function f operates on each partition. Just in case it's not clear, each RDD is composed of multiple blocks that are called the partitions. Each partition may be located on a different machine. mapPartitions is a way for you to operate on a whole partition at once, which is useful if you want to amortize a certain cost across the elements (e.g. you open a database connection and test each of them against the database). If you just want to see each element once and don't care about sharing stuff across them, use map(). Matei On Jul 17, 2014, at 12:02 AM, Kamal Banga banga.ka...@gmail.com wrote: Hi All, The function mapPartitions in RDD.scala takes a boolean parameter preservesPartitioning. It seems if that parameter is passed as false, the passed function f will operate on the data only once, whereas if it's passed as true the function will operate on each partition of the data. In my case, whatever boolean value I pass, f operates on each partition of data. Any help, regarding why I am getting this unexpected behaviour?
Re: Kmeans
Yes, both run in parallel. Random is a baseline implementation of initialization, which may ignore small clusters. k-means++ improves random initialization by adding weights to points far away to the current candidates. You can view k-means|| as a more scalable version of K-means++. We don't provide k-means++ for initialization, but we used it as part of k-means||. Please check the papers for more details. -Xiangrui On Wed, Jul 16, 2014 at 10:27 PM, amin mohebbi aminn_...@yahoo.com wrote: Thank you for the response- Can we say that both implementations are computing the centroids in parallel? I mean in both cases will the data and code send to workers and the results will be collected and passed to Driver? and why we have three types of initialization in Mlib ? Initialization: • random • k-means++ • k-means|| Best Regards ... Amin Mohebbi PhD candidate in Software Engineering at university of Malaysia H#x2F;P : +60 18 2040 017 E-Mail : tp025...@ex.apiit.edu.my amin_...@me.com On Thursday, July 17, 2014 11:57 AM, Xiangrui Meng men...@gmail.com wrote: kmeans.py contains a naive implementation of k-means in python, served as an example of how to use pyspark. Please use MLlib's implementation in practice. There is a JIRA for making it clear: https://issues.apache.org/jira/browse/SPARK-2434 -Xiangrui On Wed, Jul 16, 2014 at 8:16 PM, amin mohebbi aminn_...@yahoo.com wrote: Can anyone explain to me what is difference between kmeans in Mlib and kmeans in examples/src/main/python/kmeans.py? Best Regards ... Amin Mohebbi PhD candidate in Software Engineering at university of Malaysia H#x2F;P : +60 18 2040 017 E-Mail : tp025...@ex.apiit.edu.my amin_...@me.com
class after join
Hi all, I am a newbie Spark user with many doubts, so sorry if this is a silly question. I am dealing with tabular data formatted as text files, so when I first load the data, my code is like this: case class data_class( V1: String, V2: String, V3: String, V4: String, V5: String, V6: String, V7: String) val data= sc.textFile(data_path) .map(x = { val fields = (x+ ).split(\t) data_class(fields(0).trim(),fields(1).trim(),fields(2).trim(),fields(3).trim(), fields(4).trim(), fields(5).trim(),fields(6).trim()) }) I am doing this because I would like to access to each position using the variable name (V1...V7). Is there any other way of doing this? Also related to this question, if I have data with more than 22 variables, I am restringed to use class instead of case class. However, this kind of solution has many restrictions mainly related to getter methods. Is there any other way of doing this? And finally, one of my main problems comes after operations of different data variables. For instance, if I have two different variables (data1 and data2), and I want to join them both as: val data3 = data1.keyBy(_.V1).leftOuterJoin(data2.keyBy(_.V1)) Then I have to post process data3 in order to obtain a new class that contains those variables from data1 and also those variables from data2. As data3 is (key, (data1, data2)), do I have to create a new different class with all these attributes from data1 and data2? This is kind of annoying when there are many attributes. Thanks in advance, Best
Re: MLLib - Regularized logistic regression in python
1) This is a miss, unfortunately ... We will add support for regularization and intercept in the coming v1.1. (JIRA: https://issues.apache.org/jira/browse/SPARK-2550) 2) It has overflow problems in Python but not in Scala. We can stabilize the computation by ensuring exp only takes a negative value: 1 / ( 1 + e^ x) = 1 - 1 / ( 1 + e^{-x} ) . (JIRA: https://issues.apache.org/jira/browse/SPARK-2552) -Xiangrui On Wed, Jul 16, 2014 at 7:58 PM, Yanbo Liang yanboha...@gmail.com wrote: AFAIK for question 2, there is no built-in method to account for that problem. At right now, we can only perform one type of regularization. However, the elastic net implementation is just underway. You can refer this topic for further discussion. https://issues.apache.org/jira/browse/SPARK-1543 2014-07-17 2:08 GMT+08:00 fjeg francisco.gime...@gmail.com: 1) Okay, to clarify, there is *no* way to regularize logistic regression in python (sorry if I'm repeating your answer). 2) This method you described will have overflow errors when abs(margin) 750. Is there a built-in method to account for this? Otherwise, I will probably have to implement something like this: http://lingpipe-blog.com/2012/02/16/howprevent-overflow-underflow-logistic-regression Also, another question about the Scala implementation: Can we only do one type of regularization? Is there any way to perform elastic net which is a combination of L1 and L2? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-Regularized-logistic-regression-in-python-tp9780p9963.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Kyro deserialisation error
Seems like there is some sort of stream corruption, causing Kryo read to read a weird class name from the stream (the name arl Fridtjof Rode in the exception cannot be a class!). Not sure how to debug this. @Patrick: Any idea? On Wed, Jul 16, 2014 at 10:14 PM, Hao Wang wh.s...@gmail.com wrote: I am not sure. Not every task will fail at this Kyro exception. In most time, the cluster could successfully finish the WikipediaPageRank. How could I debug this exception? Thanks Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Thu, Jul 17, 2014 at 2:58 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Is the class that is not found in the wikipediapagerank jar? TD On Wed, Jul 16, 2014 at 12:32 AM, Hao Wang wh.s...@gmail.com wrote: Thanks for your reply. The SparkContext is configured as below: sparkConf.setAppName(WikipediaPageRank) sparkConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) sparkConf.set(spark.kryo.registrator, classOf[PRKryoRegistrator].getName) val inputFile = args(0) val threshold = args(1).toDouble val numPartitions = args(2).toInt val usePartitioner = args(3).toBoolean sparkConf.setAppName(WikipediaPageRank) sparkConf.set(spark.executor.memory, 60g) sparkConf.set(spark.cores.max, 48) sparkConf.set(spark.kryoserializer.buffer.mb, 24) val sc = new SparkContext(sparkConf) sc.addJar(~/Documents/Scala/WikiPageRank/target/scala-2.10/wikipagerank_2.10-1.0.jar) And I use spark-submit to run the application: ./bin/spark-submit --master spark://sing12:7077 --total-executor-cores 40 --executor-memory 40g --class org.apache.spark.examples.bagel.WikipediaPageRank ~/Documents/Scala/WikiPageRank/target/scala-2.10/wikipagerank_2.10-1.0.jar hdfs://192.168.1.12:9000/freebase-26G 1 200 True Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Wed, Jul 16, 2014 at 1:41 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using classes from external libraries that have not been added to the sparkContext, using sparkcontext.addJar()? TD On Tue, Jul 15, 2014 at 8:36 PM, Hao Wang wh.s...@gmail.com wrote: I am running the WikipediaPageRank in Spark example and share the same problem with you: 4/07/16 11:31:06 DEBUG DAGScheduler: submitStage(Stage 6) 14/07/16 11:31:06 ERROR TaskSetManager: Task 6.0:450 failed 4 times; aborting job 14/07/16 11:31:06 INFO DAGScheduler: Failed to run foreach at Bagel.scala:251 Exception in thread main 14/07/16 11:31:06 INFO TaskSchedulerImpl: Cancelling stage 6 org.apache.spark.SparkException: Job aborted due to stage failure: Task 6.0:450 failed 4 times, most recent failure: Exception failure in TID 1330 on host sing11: com.esotericsoftware.kryo.KryoException: Unable to find class: arl Fridtjof Rode com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721) com.twitter.chill.TraversableSerializer.read(Traversable.scala:44) com.twitter.chill.TraversableSerializer.read(Traversable.scala:21) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:115) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96) org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95) org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) Anyone cloud help? Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Tue, Jun 3, 2014 at 8:02 PM, Denes te...@outlook.com wrote: I tried to use Kryo as a serialiser isn spark streaming, did everything according to the guide posted on the spark website, i.e. added the following lines: conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); conf.set(spark.kryo.registrator,
Re: Kyro deserialisation error
Not sure if this helps, but it does seem to be part of a name in a Wikipedia article, and Wikipedia is the data set. So something is reading this class name from the data. http://en.wikipedia.org/wiki/Carl_Fridtjof_Rode On Thu, Jul 17, 2014 at 9:40 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Seems like there is some sort of stream corruption, causing Kryo read to read a weird class name from the stream (the name arl Fridtjof Rode in the exception cannot be a class!). Not sure how to debug this. @Patrick: Any idea?
Re: class after join
Thank you for your fast reply. We are considering this Map[String, String] solution, but there are some details that we do not control yet. What would happen if we have different data types for different fields? Also, with this solution, we have to repeat the field names for every row that we have, is this efficient? Regarding the solution with composition, the key would be repeated in the new class, whereas it is only necessary once after the join, isn't it? On Thu, Jul 17, 2014 at 10:25 AM, Sean Owen so...@cloudera.com wrote: If what you have is a large number of named strings, why not use a Map[String,String] to represent them? If you're approaching a class with 22 String fields anyway, it probably makes more sense. You lose a bit of compile-time checking, but gain flexibility. Also, merging two Maps to make a new one is pretty simple, compared to making many of these values classes. (Although, if you otherwise needed a class that represented all of the things in class A and class B, this could be done easily with composition, a class with an A and a B inside.) On Thu, Jul 17, 2014 at 9:15 AM, Luis Guerra luispelay...@gmail.com wrote: Hi all, I am a newbie Spark user with many doubts, so sorry if this is a silly question. I am dealing with tabular data formatted as text files, so when I first load the data, my code is like this: case class data_class( V1: String, V2: String, V3: String, V4: String, V5: String, V6: String, V7: String) val data= sc.textFile(data_path) .map(x = { val fields = (x+ ).split(\t) data_class(fields(0).trim(),fields(1).trim(),fields(2).trim(),fields(3).trim(), fields(4).trim(), fields(5).trim(),fields(6).trim()) }) I am doing this because I would like to access to each position using the variable name (V1...V7). Is there any other way of doing this? Also related to this question, if I have data with more than 22 variables, I am restringed to use class instead of case class. However, this kind of solution has many restrictions mainly related to getter methods. Is there any other way of doing this? And finally, one of my main problems comes after operations of different data variables. For instance, if I have two different variables (data1 and data2), and I want to join them both as: val data3 = data1.keyBy(_.V1).leftOuterJoin(data2.keyBy(_.V1)) Then I have to post process data3 in order to obtain a new class that contains those variables from data1 and also those variables from data2. As data3 is (key, (data1, data2)), do I have to create a new different class with all these attributes from data1 and data2? This is kind of annoying when there are many attributes. Thanks in advance, Best
Re: Pysparkshell are not listing in the web UI while running
Hi Neethu, Your application is running on local mode and that's the reason why you are not seeing the driver app in the 8080 webUI. You can pass the Master IP to your pyspark and get it running in cluster mode. eg: IPYTHON_OPTS=notebook --pylab inline $SPARK_HOME/bin/pyspark --master spark://master:7077 Replace master:7077 with the spark uri that you are seeing in top left of the 8080 webui. Thanks Best Regards On Thu, Jul 17, 2014 at 1:35 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi all, I just upgraded to spark 1.0.1. In spark 1.0.0 when I start Ipython notebook using the following command,it used to come in the running applications tab in master:8080 web UI. IPYTHON_OPTS=notebook --pylab inline $SPARK_HOME/bin/pyspark But now when I run it,its not getting listed under running application/completed application(once its closed).But I am able to see the spark stages at master:4040 while its running Anyone have any idea why this Thanks Regards, Meethu M
Bad Digest error while doing aws s3 put
Hi, I am getting the following error while trying save a large dataset to s3 using the saveAsHadoopFile command with apache spark-1.0. org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException: S3 PUT failed for '/spark_test%2Fsmaato_one_day_phase_2%2Fsmaato_2014_05_17%2F_temporary%2F_attempt_201407170658__m_36_276%2Fpart-00036' XML Error Message: ?xml version=1.0 encoding=UTF-8?ErrorCodeBadDigest/CodeMessageThe Content-MD5 you specified did not match what we received./MessageExpectedDigestN808DtNfYiTFzI+i2HxLEw==/ExpectedDigestCalculatedDigest66nS+2C1QqQmmcTeFpXOjw==/CalculatedDigestRequestId4FB3A3D60B187CE7/RequestIdHostIdH2NznP+RvwspekVHBMvgWGYAupKuO5YceSgmiLym6rOajOh5v5GnyM0VkO+dadyG/HostId/Error I have used the same command to write similar content with lesser data to s3 without any problem. When I googled this error message, they say it might be due to md5 checksum mismatch. But will this happen due to load? Regards, lmk -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Bad-Digest-error-while-doing-aws-s3-put-tp10036.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Apache kafka + spark + Parquet
Hi All, Currently we are reading (multiple) topics from Apache kafka and storing that in HBase (multiple tables) using twitter storm (1 tuple stores in 4 different tables). but we are facing some performance issue with HBase. so we are replacing* HBase* with *Parquet* file and *storm* with *Apache Spark*. difficulties: 1. How to read multiple topics from kafka using spark? 2. One tuple belongs to multiple tables, How to write one topic to multiple parquet files with proper partitioning using spark?? Please help me Thanks in advance. -- *Regards,* *Mahebub *
Spark scheduling with Capacity scheduler
Hi all, I'm using HDP 2.0, YARN. I'm running both MapReduce and Spark jobs on this cluster, is it possible somehow use Capacity scheduler for Spark jobs management as well as MR jobs? I mean, I'm able to send MR job to specific queue, may I do the same with Spark job? thank you in advance Thank you, Konstantin Kudryavtsev
Re: Apache kafka + spark + Parquet
1. You can put in multiple kafka topics in the same Kafka input stream. See the example KafkaWordCount https://github.com/apache/spark/blob/68f28dabe9c7679be82e684385be216319beb610/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala . However they will all be read through a single receiver (though multiple threads, one per topic). To parallelize the read (for increasing throughput), you can create multiple Kafka input streams, and splits the topics appropriately between them. 2. You can easily read and write to parquet files in Spark. Any RDD (generated through DStreams in Spark Streaming, or otherwise), can be converted to a SchemaRDD and then saved in the parquet format as rdd.saveAsParquetFile. See the Spark SQL guide http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files for more details. So if you want to write a same dataset (as RDDs) to two different parquet files, you just have to call saveAsParquetFile twice (on same or transformed versions of the RDD), as shown in the guide. Hope this helps! TD On Thu, Jul 17, 2014 at 2:19 AM, Mahebub Sayyed mahebub...@gmail.com wrote: Hi All, Currently we are reading (multiple) topics from Apache kafka and storing that in HBase (multiple tables) using twitter storm (1 tuple stores in 4 different tables). but we are facing some performance issue with HBase. so we are replacing* HBase* with *Parquet* file and *storm* with *Apache Spark*. difficulties: 1. How to read multiple topics from kafka using spark? 2. One tuple belongs to multiple tables, How to write one topic to multiple parquet files with proper partitioning using spark?? Please help me Thanks in advance. -- *Regards,* *Mahebub *
Re: Speeding up K-Means Clustering
Is it v0.9? Did you run in local mode? Try to set --driver-memory 4g and repartition your data to match number of CPU cores such that the data is evenly distributed. You need 1m * 50 * 8 ~ 400MB to storage the data. Make sure there are enough memory for caching. -Xiangrui On Thu, Jul 17, 2014 at 1:48 AM, Ravishankar Rajagopalan viora...@gmail.com wrote: I am trying to use MLlib for K-Means clustering on a data set with 1 million rows and 50 columns (all columns have double values) which is on HDFS (raw txt file is 28 MB) I initially tried the following: val data3 = sc.textFile(hdfs://...inputData.txt) val parsedData3 = data3.map( _.split('\t').map(_.toDouble)) val numIterations = 10 val numClusters = 200 val clusters = KMeans.train(parsedData3, numClusters, numIterations) This took me nearly 850 seconds. I tried using persist with MEMORY_ONLY option hoping that this would significantly speed up the algorithm: val data3 = sc.textFile(hdfs://...inputData.txt) val parsedData3 = data3.map( _.split('\t').map(_.toDouble)) parsedData3.persist(MEMORY_ONLY) val numIterations = 10 val numClusters = 200 val clusters = KMeans.train(parsedData3, numClusters, numIterations) This resulted in only a marginal improvement and took around 720 seconds. Is there any other way to speed up the algorithm further? Thank you. Regards, Ravi
Re: Kyro deserialisation error
Hi, all Yes, it's a name of Wikipedia article. I am running WikipediaPageRank example of Spark Bagels. I am wondering whether there is any relation to buffer size of Kyro. The page rank can be successfully finished, sometimes not because this kind of Kyro exception happens too many times, which beats the maxTaskFailures. I find this *Kyro exception: unable to find class *in my successful case, too. Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Thu, Jul 17, 2014 at 4:44 PM, Sean Owen so...@cloudera.com wrote: Not sure if this helps, but it does seem to be part of a name in a Wikipedia article, and Wikipedia is the data set. So something is reading this class name from the data. http://en.wikipedia.org/wiki/Carl_Fridtjof_Rode On Thu, Jul 17, 2014 at 9:40 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Seems like there is some sort of stream corruption, causing Kryo read to read a weird class name from the stream (the name arl Fridtjof Rode in the exception cannot be a class!). Not sure how to debug this. @Patrick: Any idea?
Re: Pysparkshell are not listing in the web UI while running
Hi Akhil, That fixed the problem...Thanks Thanks Regards, Meethu M On Thursday, 17 July 2014 2:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Neethu, Your application is running on local mode and that's the reason why you are not seeing the driver app in the 8080 webUI. You can pass the Master IP to your pyspark and get it running in cluster mode. eg: IPYTHON_OPTS=notebook --pylab inline $SPARK_HOME/bin/pyspark --master spark://master:7077 Replace master:7077 with the spark uri that you are seeing in top left of the 8080 webui. Thanks Best Regards On Thu, Jul 17, 2014 at 1:35 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi all, I just upgraded to spark 1.0.1. In spark 1.0.0 when I start Ipython notebook using the following command,it used to come in the running applications tab in master:8080 web UI. IPYTHON_OPTS=notebook --pylab inline $SPARK_HOME/bin/pyspark But now when I run it,its not getting listed under running application/completed application(once its closed).But I am able to see the spark stages at master:4040 while its running Anyone have any idea why this Thanks Regards, Meethu M
GraphX Pragel implementation
Hi I am trying to implement belief propagation algorithm in GraphX using the pragel API. *def* pregel[A] (initialMsg*:* A, maxIter*:* Int = *Int*.*MaxValue*, activeDir*:* EdgeDirection = *EdgeDirection*.*Out*) (vprog*:* (VertexId, VD, A) *=* *VD*, sendMsg*:* EdgeTriplet[VD, ED] *=* *Iterator*[(VertexId, A)], mergeMsg*:* (A, A) *=* A) In this can we create messages in vprog function(From in coming messages) and send them using sendMsg ? Regards Arun
Re: Apache kafka + spark + Parquet
Hi, To migrate data from *HBase *to *Parquet* we used following query through * Impala*: INSERT INTO table PARQUET_HASHTAGS( key, city_name, country_name, hashtag_date, hashtag_text, hashtag_source, hashtag_month, posted_time, hashtag_time, tweet_id, user_id, user_name, hashtag_year ) *partition(year, month, day)* SELECT key, city_name, country_name, hashtag_date, hashtag_text, hashtag_source, hashtag_month, posted_time, hashtag_time, tweet_id, user_id, user_name,hashtag_year, cast(hashtag_year as int),cast(hashtag_month as int), cast(hashtag_date as int) from HASHTAGS where hashtag_year='2014' and hashtag_month='04' and hashtag_date='01' ORDER BY key,hashtag_year,hashtag_month,hashtag_date LIMIT 1000 offset 0; using above query we have successfully migrated form HBase to Parquet files with proper partitions. Now we are storing Data direct from *Kafka *to *Parquet.* *How is it possible to create partitions while storing data direct from kafka to Parquet files??* *(likewise created in above query)* On Thu, Jul 17, 2014 at 12:35 PM, Tathagata Das tathagata.das1...@gmail.com wrote: 1. You can put in multiple kafka topics in the same Kafka input stream. See the example KafkaWordCount https://github.com/apache/spark/blob/68f28dabe9c7679be82e684385be216319beb610/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala . However they will all be read through a single receiver (though multiple threads, one per topic). To parallelize the read (for increasing throughput), you can create multiple Kafka input streams, and splits the topics appropriately between them. 2. You can easily read and write to parquet files in Spark. Any RDD (generated through DStreams in Spark Streaming, or otherwise), can be converted to a SchemaRDD and then saved in the parquet format as rdd.saveAsParquetFile. See the Spark SQL guide http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files for more details. So if you want to write a same dataset (as RDDs) to two different parquet files, you just have to call saveAsParquetFile twice (on same or transformed versions of the RDD), as shown in the guide. Hope this helps! TD On Thu, Jul 17, 2014 at 2:19 AM, Mahebub Sayyed mahebub...@gmail.com wrote: Hi All, Currently we are reading (multiple) topics from Apache kafka and storing that in HBase (multiple tables) using twitter storm (1 tuple stores in 4 different tables). but we are facing some performance issue with HBase. so we are replacing* HBase* with *Parquet* file and *storm* with *Apache Spark*. difficulties: 1. How to read multiple topics from kafka using spark? 2. One tuple belongs to multiple tables, How to write one topic to multiple parquet files with proper partitioning using spark?? Please help me Thanks in advance. -- *Regards,* *Mahebub * -- *Regards,* *Mahebub Sayyed*
Re: Speeding up K-Means Clustering
Hi Xiangrui, Yes I am using Spark v0.9 and am not running it in local mode. I did the memory setting using export SPARK_MEM=4G before starting the Spark instance. Also previously, I was starting it with -c 1 but changed it to -c 12 since it is a 12 core machine. It did bring down the time taken to less than 200 seconds from over 700 seconds. I am not sure how to repartition the data to match the CPU cores. How do I do it? Thank you. Ravi On Thu, Jul 17, 2014 at 3:17 PM, Xiangrui Meng men...@gmail.com wrote: Is it v0.9? Did you run in local mode? Try to set --driver-memory 4g and repartition your data to match number of CPU cores such that the data is evenly distributed. You need 1m * 50 * 8 ~ 400MB to storage the data. Make sure there are enough memory for caching. -Xiangrui On Thu, Jul 17, 2014 at 1:48 AM, Ravishankar Rajagopalan viora...@gmail.com wrote: I am trying to use MLlib for K-Means clustering on a data set with 1 million rows and 50 columns (all columns have double values) which is on HDFS (raw txt file is 28 MB) I initially tried the following: val data3 = sc.textFile(hdfs://...inputData.txt) val parsedData3 = data3.map( _.split('\t').map(_.toDouble)) val numIterations = 10 val numClusters = 200 val clusters = KMeans.train(parsedData3, numClusters, numIterations) This took me nearly 850 seconds. I tried using persist with MEMORY_ONLY option hoping that this would significantly speed up the algorithm: val data3 = sc.textFile(hdfs://...inputData.txt) val parsedData3 = data3.map( _.split('\t').map(_.toDouble)) parsedData3.persist(MEMORY_ONLY) val numIterations = 10 val numClusters = 200 val clusters = KMeans.train(parsedData3, numClusters, numIterations) This resulted in only a marginal improvement and took around 720 seconds. Is there any other way to speed up the algorithm further? Thank you. Regards, Ravi
Re: Getting pyspark.resultiterable.ResultIterable at xxxxxx in local shell
Could someone please help me resolve This post has NOT been accepted by the mailing list yet. issue. I registered and subscribed to the mailing list many days ago but my post is still in unaccepted state. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-pyspark-resultiterable-ResultIterable-at-xx-in-local-shell-tp9490p10046.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming timing considerations
Hi TD, I have been able to filter the first WindowedRDD, but I am not sure how to make a generic filter. The larger window is 8 seconds and want to fetch 4 second based on application-time-stamp. I have seen an earlier post which suggest timeStampBasedwindow but I am not sure how to make timestampBasedwindow in the following example. val transformed = keyAndValues.window(Seconds(8), Seconds(4)).transform(windowedRDD = { //val timeStampBasedWindow = ??? // define the window over the timestamp that you want to process val filteredRDD = windowedRDD.filter(_._1 4) // filter and retain only the records that fall in the timestamp-based window return filteredRDD }) Consider the input tuples as (1,23),(1.2,34) . . . . . (3.8,54)(4,413) . . . whereas key is the timestamp. Regards, Laeeq On Saturday, July 12, 2014 8:29 PM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi, Thanks I will try to implement it. Regards, Laeeq On Saturday, July 12, 2014 4:37 AM, Tathagata Das tathagata.das1...@gmail.com wrote: This is not in the current streaming API. Queue stream is useful for testing with generated RDDs, but not for actual data. For actual data stream, the slack time can be implemented by doing DStream.window on a larger window that take slack time in consideration, and then the required application-time-based-window of data filtered out. For example, if you want a slack time of 1 minute and batches of 10 seconds, then do a window operation of 70 seconds, then in each RDD filter out the records with the desired application time and process them. TD On Fri, Jul 11, 2014 at 7:44 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi, In the spark streaming paper, slack time has been suggested for delaying the batch creation in case of external timestamps. I don't see any such option in streamingcontext. Is it available in the API? Also going through the previous posts, queueStream has been suggested for this. I looked into to queueStream example. // Create and push some RDDs into Queue for (i - 1 to 30) { rddQueue += ssc.sparkContext.makeRDD(1 to 10) Thread.sleep(1000) } The only thing I am unsure is how to make batches(basic RDD) out of stream coming on a port. Regards, Laeeq
Equivalent functions for NVL() and CASE expressions in Spark SQL
Do we have any equivalent scala functions available for NVL() and CASE expressions to use in spark sql?
Re: Simple record matching using Spark SQL
Added below 2 lines just before the sql query line - *...* *file1_schema.count;* *file2_schema.count;* *...* and it started working. But I couldn't get the reason. Can someone please explain me? What was happening earlier and what is happening with addition of these 2 lines? ~Sarath On Thu, Jul 17, 2014 at 1:13 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: No Sonal, I'm not doing any explicit call to stop context. If you see my previous post to Michael, the commented portion of the code is my requirement. When I run this over standalone spark cluster, the execution keeps running with no output or error. After waiting for several minutes I'm killing it by pressing Ctrl+C in the terminal. But the same code runs perfectly when executed from spark shell. ~Sarath On Thu, Jul 17, 2014 at 1:05 PM, Sonal Goyal sonalgoy...@gmail.com wrote: Hi Sarath, Are you explicitly stopping the context? sc.stop() Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Thu, Jul 17, 2014 at 12:51 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Michael, Soumya, Can you please check and let me know what is the issue? what am I missing? Let me know if you need any logs to analyze. ~Sarath On Wed, Jul 16, 2014 at 8:24 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Michael, Tried it. It's correctly printing the line counts of both the files. Here's what I tried - *Code:* *package test* *object Test4 {* * case class Test(fld1: String, * * fld2: String, * * fld3: String, * * fld4: String, * * fld5: String, * * fld6: Double, * * fld7: String);* * def main(args: Array[String]) {* *val conf = new SparkConf()* *.setMaster(args(0))* * .setAppName(SQLTest)* * .setSparkHome(args(1))* * .set(spark.executor.memory, 2g);* *val sc = new SparkContext(conf);* *sc.addJar(test1-0.1.jar);* *val file1 = sc.textFile(args(2));* *println(file1.count());* *val file2 = sc.textFile(args(3));* *println(file2.count());* *//val sq = new SQLContext(sc);* *//import sq._* *//val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* *//val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* *//val file1_schema = sq.createSchemaRDD(file1_recs);* *//val file2_schema = sq.createSchemaRDD(file2_recs);* *//file1_schema.registerAsTable(file1_tab);* *//file2_schema.registerAsTable(file2_tab);* *//val matched = sq.sql(select * from file1_tab l join file2_tab s on + * *// l.fld7=s.fld7 where l.fld2=s.fld2 and + * *// l.fld3=s.fld3 and l.fld4=s.fld4 and + * *// l.fld6=s.fld6);* *//matched.collect().foreach(println);* * }* *}* *Execution:* *export CLASSPATH=$HADOOP_PREFIX/conf:$SPARK_HOME/lib/*:test1-0.1.jar* *export CONFIG_OPTS=-Dspark.jars=test1-0.1.jar* *java -cp $CLASSPATH $CONFIG_OPTS test.Test4 spark://master:7077 /usr/local/spark-1.0.1-bin-hadoop1 hdfs://master:54310/user/hduser/file1.csv hdfs://master:54310/user/hduser/file2.csv* ~Sarath On Wed, Jul 16, 2014 at 8:14 PM, Michael Armbrust mich...@databricks.com wrote: What if you just run something like: *sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv).count()* On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Yes Soumya, I did it. First I tried with the example available in the documentation (example using people table and finding teenagers). After successfully running it, I moved on to this one which is starting point to a bigger requirement for which I'm evaluating Spark SQL. On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Can you try submitting a very simple job to the cluster. On Jul 16, 2014, at 10:25 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Yes it is appearing on the Spark UI, and remains there with state as RUNNING till I press Ctrl+C in the terminal to kill the execution. Barring the statements to create the spark context, if I copy paste the lines of my code in spark shell, runs perfectly giving the desired output. ~Sarath On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta soumya.sima...@gmail.com wrote: When you submit your job, it should appear on the Spark UI. Same with the REPL. Make sure you job is submitted to the cluster properly. On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Soumya, Data is very small, 500+ lines in each file. Removed last 2 lines and placed this at the end matched.collect().foreach(println);. Still no luck. It's been more than 5min, the execution is still running. Checked logs, nothing in stdout. In stderr I don't see anything
Re: Simple record matching using Spark SQL
What version are you running? Could you provide a jstack of the driver and executor when it is hanging? On Thu, Jul 17, 2014 at 10:55 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Added below 2 lines just before the sql query line - *...* *file1_schema.count;* *file2_schema.count;* *...* and it started working. But I couldn't get the reason. Can someone please explain me? What was happening earlier and what is happening with addition of these 2 lines? ~Sarath On Thu, Jul 17, 2014 at 1:13 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: No Sonal, I'm not doing any explicit call to stop context. If you see my previous post to Michael, the commented portion of the code is my requirement. When I run this over standalone spark cluster, the execution keeps running with no output or error. After waiting for several minutes I'm killing it by pressing Ctrl+C in the terminal. But the same code runs perfectly when executed from spark shell. ~Sarath On Thu, Jul 17, 2014 at 1:05 PM, Sonal Goyal sonalgoy...@gmail.com wrote: Hi Sarath, Are you explicitly stopping the context? sc.stop() Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Thu, Jul 17, 2014 at 12:51 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Michael, Soumya, Can you please check and let me know what is the issue? what am I missing? Let me know if you need any logs to analyze. ~Sarath On Wed, Jul 16, 2014 at 8:24 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Michael, Tried it. It's correctly printing the line counts of both the files. Here's what I tried - *Code:* *package test* *object Test4 {* * case class Test(fld1: String, * * fld2: String, * * fld3: String, * * fld4: String, * * fld5: String, * * fld6: Double, * * fld7: String);* * def main(args: Array[String]) {* *val conf = new SparkConf()* *.setMaster(args(0))* * .setAppName(SQLTest)* * .setSparkHome(args(1))* * .set(spark.executor.memory, 2g);* *val sc = new SparkContext(conf);* *sc.addJar(test1-0.1.jar);* *val file1 = sc.textFile(args(2));* *println(file1.count());* *val file2 = sc.textFile(args(3));* *println(file2.count());* *//val sq = new SQLContext(sc);* *//import sq._* *//val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l = Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));* *//val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s = Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));* *//val file1_schema = sq.createSchemaRDD(file1_recs);* *//val file2_schema = sq.createSchemaRDD(file2_recs);* *//file1_schema.registerAsTable(file1_tab);* *//file2_schema.registerAsTable(file2_tab);* *//val matched = sq.sql(select * from file1_tab l join file2_tab s on + * *// l.fld7=s.fld7 where l.fld2=s.fld2 and + * *// l.fld3=s.fld3 and l.fld4=s.fld4 and + * *// l.fld6=s.fld6);* *//matched.collect().foreach(println);* * }* *}* *Execution:* *export CLASSPATH=$HADOOP_PREFIX/conf:$SPARK_HOME/lib/*:test1-0.1.jar* *export CONFIG_OPTS=-Dspark.jars=test1-0.1.jar* *java -cp $CLASSPATH $CONFIG_OPTS test.Test4 spark://master:7077 /usr/local/spark-1.0.1-bin-hadoop1 hdfs://master:54310/user/hduser/file1.csv hdfs://master:54310/user/hduser/file2.csv* ~Sarath On Wed, Jul 16, 2014 at 8:14 PM, Michael Armbrust mich...@databricks.com wrote: What if you just run something like: *sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv).count()* On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Yes Soumya, I did it. First I tried with the example available in the documentation (example using people table and finding teenagers). After successfully running it, I moved on to this one which is starting point to a bigger requirement for which I'm evaluating Spark SQL. On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Can you try submitting a very simple job to the cluster. On Jul 16, 2014, at 10:25 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Yes it is appearing on the Spark UI, and remains there with state as RUNNING till I press Ctrl+C in the terminal to kill the execution. Barring the statements to create the spark context, if I copy paste the lines of my code in spark shell, runs perfectly giving the desired output. ~Sarath On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta soumya.sima...@gmail.com wrote: When you submit your job, it should appear on the Spark UI. Same with the REPL. Make sure you job is submitted to the cluster properly. On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Soumya, Data is very small, 500+ lines in each file. Removed last
Re: class after join
If you intern the string it will be more efficient, but still significantly more expensive than the class based approach. ** VERY EXPERIMENTAL ** We are working with EPFL on a lightweight syntax for naming the results of spark transformations in scala (and are going to make it interoperate with SQL). Sparse details here: https://github.com/scala-records/scala-records Stay tuned for more... Michael On Thu, Jul 17, 2014 at 4:47 AM, Luis Guerra luispelay...@gmail.com wrote: Thank you for your fast reply. We are considering this Map[String, String] solution, but there are some details that we do not control yet. What would happen if we have different data types for different fields? Also, with this solution, we have to repeat the field names for every row that we have, is this efficient? Regarding the solution with composition, the key would be repeated in the new class, whereas it is only necessary once after the join, isn't it? On Thu, Jul 17, 2014 at 10:25 AM, Sean Owen so...@cloudera.com wrote: If what you have is a large number of named strings, why not use a Map[String,String] to represent them? If you're approaching a class with 22 String fields anyway, it probably makes more sense. You lose a bit of compile-time checking, but gain flexibility. Also, merging two Maps to make a new one is pretty simple, compared to making many of these values classes. (Although, if you otherwise needed a class that represented all of the things in class A and class B, this could be done easily with composition, a class with an A and a B inside.) On Thu, Jul 17, 2014 at 9:15 AM, Luis Guerra luispelay...@gmail.com wrote: Hi all, I am a newbie Spark user with many doubts, so sorry if this is a silly question. I am dealing with tabular data formatted as text files, so when I first load the data, my code is like this: case class data_class( V1: String, V2: String, V3: String, V4: String, V5: String, V6: String, V7: String) val data= sc.textFile(data_path) .map(x = { val fields = (x+ ).split(\t) data_class(fields(0).trim(),fields(1).trim(),fields(2).trim(),fields(3).trim(), fields(4).trim(), fields(5).trim(),fields(6).trim()) }) I am doing this because I would like to access to each position using the variable name (V1...V7). Is there any other way of doing this? Also related to this question, if I have data with more than 22 variables, I am restringed to use class instead of case class. However, this kind of solution has many restrictions mainly related to getter methods. Is there any other way of doing this? And finally, one of my main problems comes after operations of different data variables. For instance, if I have two different variables (data1 and data2), and I want to join them both as: val data3 = data1.keyBy(_.V1).leftOuterJoin(data2.keyBy(_.V1)) Then I have to post process data3 in order to obtain a new class that contains those variables from data1 and also those variables from data2. As data3 is (key, (data1, data2)), do I have to create a new different class with all these attributes from data1 and data2? This is kind of annoying when there are many attributes. Thanks in advance, Best
Is there a way to get previous/other keys' state in Spark Streaming?
Hi guys, sure you have similar use case and want to know how you deal with that. In our application, we want to check the previous state of some keys and compare with their current state. AFAIK, Spark Streaming does not have key-value access. So current what I am doing is storing the previous and current data as one date type in the state. Call updateStateByKey in every interval and work on the state (have previous and current data) of the generated new DStream. But it has limitations: 1. can not access keys that do appear in this time interval. 2. can not update key A's state from key B's if only key B appears in this time interval. Am I doing something wrong? Any suggestions? Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108
Re: Spark scheduling with Capacity scheduler
It's possible using the --queue argument of spark-submit. Unfortunately this is not documented on http://spark.apache.org/docs/latest/running-on-yarn.html but it appears if you just type spark-submit --help or spark-submit with no arguments. Matei On Jul 17, 2014, at 2:33 AM, Konstantin Kudryavtsev kudryavtsev.konstan...@gmail.com wrote: Hi all, I'm using HDP 2.0, YARN. I'm running both MapReduce and Spark jobs on this cluster, is it possible somehow use Capacity scheduler for Spark jobs management as well as MR jobs? I mean, I'm able to send MR job to specific queue, may I do the same with Spark job? thank you in advance Thank you, Konstantin Kudryavtsev
Re: Spark scheduling with Capacity scheduler
unsubscribe From: Matei Zaharia matei.zaha...@gmail.com To: user@spark.apache.org Date: 07/17/2014 12:41 PM Subject:Re: Spark scheduling with Capacity scheduler It's possible using the --queue argument of spark-submit. Unfortunately this is not documented on http://spark.apache.org/docs/latest/running-on-yarn.html but it appears if you just type spark-submit --help or spark-submit with no arguments. Matei On Jul 17, 2014, at 2:33 AM, Konstantin Kudryavtsev kudryavtsev.konstan...@gmail.com wrote: Hi all, I'm using HDP 2.0, YARN. I'm running both MapReduce and Spark jobs on this cluster, is it possible somehow use Capacity scheduler for Spark jobs management as well as MR jobs? I mean, I'm able to send MR job to specific queue, may I do the same with Spark job? thank you in advance Thank you, Konstantin Kudryavtsev
Error while running example/scala application using spark-submit
Hi, I am receiving below error while submitting any spark example or scala application. Really appreciate any help. spark version = 1.0.0 hadoop version = 2.4.0 Windows/Standalone mode 14/07/17 22:13:19 INFO TaskSchedulerImpl: Cancelling stage 0 Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0 failed 4 times, most recent failure: Exception failure in TID 6 o n host java.lang.NullPointerException java.lang.ProcessBuilder.start(ProcessBuilder.java:1012) org.apache.hadoop.util.Shell.runCommand(Shell.java:445) org.apache.hadoop.util.Shell.run(Shell.java:418) org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:650) org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:873) org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853) org.apache.spark.util.Utils$.fetchFile(Utils.scala:421) org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:332) org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) scala.collection.mutable.HashMap.foreach(HashMap.scala:98) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:330) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:168) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Exception in thread delete Spark temp dir C:\Users\~1\AppData\Local\Temp\spark-88e26679-5a8f-4a37-bf02-41f4b2b46d8f java.io.IOException: Failed to delete: C:\User s\~1\AppData\Local\Temp\spark-88e26679-5a8f-4a37-bf02-41f4b2b46d8f\jars\spark-examples-1.0.0-hadoop2.4.0.jar at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:599) at org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:593) at org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:592) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:592) at org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:593) at org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:592) at
Need help on Spark UDF (Join) Performance tuning .
Hello Experts, I am facing performance problem when I use the UDF function call. Please help me to tune the query. Please find the details below shark select count(*) from table1; OK 151096 Time taken: 7.242 seconds shark select count(*) from table2; OK 938 Time taken: 1.273 seconds Without UDF: shark SELECT count(pvc1.time) FROM table2 pvc2 JOIN table1 pvc1 WHERE pvc1.col1 = pvc2.col2 AND unix_timestamp(pvc2.time, '-MM-dd HH:mm:ss,SSS') unix_timestamp(pvc1.time, '-MM-dd HH:mm:ss,SSS'); OK 328 Time taken: 200.487 seconds shark SELECT count(pvc1.time) FROM table2 pvc2 JOIN table1 pvc1 WHERE (pvc1.col1 = pvc2.col1 OR pvc1.col1 = pvc2.col2) AND unix_timestamp(pvc2.time, '-MM-dd HH:mm:ss,SSS') unix_timestamp(pvc1.time, '-MM-dd HH:mm:ss,SSS'); OK 331 Time taken: 292.86 seconds With UDF: shark SELECT count(pvc1.time) FROM table2 pvc2 JOIN table1 pvc1 WHERE testCompare(pvc1.col1,pvc1.col2, pvc2.col1,pvc2.col2) AND unix_timestamp(pvc2.time, '-MM-dd HH:mm:ss,SSS') unix_timestamp(pvc1.time, '-MM-dd HH:mm:ss,SSS'); OK 331 Time taken: 3718.23 seconds The above UDF query takes more time to run. Where testCompare is an udf function, The function just does the pvc1.col1 = pvc2.col1 OR pvc1.col1 = pvc2.col2 Please let me know what is the issue here? Thanks and Regards, Sankar S.
Re: can't get jobs to run on cluster (enough memory and cpus are available on worker)
On Wed, Jul 16, 2014 at 12:36 PM, Matt Work Coarr mattcoarr.w...@gmail.com wrote: Thanks Marcelo, I'm not seeing anything in the logs that clearly explains what's causing this to break. One interesting point that we just discovered is that if we run the driver and the slave (worker) on the same host it runs, but if we run the driver on a separate host it does not run. When I meant the executor log, I meant the log of the process launched by the worker, not the worker. In my CDH-based Spark install, those end up in /var/run/spark/work. If you look at your worker log, you'll see it's launching the executor process. So there should be something there. Since you say it works when both are run in the same node, that probably points to some communication issue, since the executor needs to connect back to the driver. Check to see if you don't have any firewalls blocking the ports Spark tries to use. (That's one of the non-resource-related cases that will cause that message.) -- Marcelo
Re: Speeding up K-Means Clustering
Please try val parsedData3 = data3.repartition(12).map(_.split(\t)).map(_.toDouble).cache() and check the storage and driver/executor memory in the WebUI. Make sure the data is fully cached. -Xiangrui On Thu, Jul 17, 2014 at 5:09 AM, Ravishankar Rajagopalan viora...@gmail.com wrote: Hi Xiangrui, Yes I am using Spark v0.9 and am not running it in local mode. I did the memory setting using export SPARK_MEM=4G before starting the Spark instance. Also previously, I was starting it with -c 1 but changed it to -c 12 since it is a 12 core machine. It did bring down the time taken to less than 200 seconds from over 700 seconds. I am not sure how to repartition the data to match the CPU cores. How do I do it? Thank you. Ravi On Thu, Jul 17, 2014 at 3:17 PM, Xiangrui Meng men...@gmail.com wrote: Is it v0.9? Did you run in local mode? Try to set --driver-memory 4g and repartition your data to match number of CPU cores such that the data is evenly distributed. You need 1m * 50 * 8 ~ 400MB to storage the data. Make sure there are enough memory for caching. -Xiangrui On Thu, Jul 17, 2014 at 1:48 AM, Ravishankar Rajagopalan viora...@gmail.com wrote: I am trying to use MLlib for K-Means clustering on a data set with 1 million rows and 50 columns (all columns have double values) which is on HDFS (raw txt file is 28 MB) I initially tried the following: val data3 = sc.textFile(hdfs://...inputData.txt) val parsedData3 = data3.map( _.split('\t').map(_.toDouble)) val numIterations = 10 val numClusters = 200 val clusters = KMeans.train(parsedData3, numClusters, numIterations) This took me nearly 850 seconds. I tried using persist with MEMORY_ONLY option hoping that this would significantly speed up the algorithm: val data3 = sc.textFile(hdfs://...inputData.txt) val parsedData3 = data3.map( _.split('\t').map(_.toDouble)) parsedData3.persist(MEMORY_ONLY) val numIterations = 10 val numClusters = 200 val clusters = KMeans.train(parsedData3, numClusters, numIterations) This resulted in only a marginal improvement and took around 720 seconds. Is there any other way to speed up the algorithm further? Thank you. Regards, Ravi
Re: Spark Streaming Json file groupby function
hi TD, Thanks for the solutions for my previous post...I am running into other issue..i am getting data from json file and i am trying to parse it and trying to map it to a record given below val jsonf =lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[Any,Any]]).map(data=Record(data(ID).toString,data(name).toString,data(score).toInt,data(school).toString)) case class Record(ID:String,name:String,score:Int,school:String) when i am trying to do this i am getting an error [error] /home/ubuntu/spark-1.0.0/external/jsonfile2/src/main/scala/jsonfile.scala:36: value toInt is not a member of Any [error] lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[Any,Any]]).map(data=Record(data(ID).toString,data(name).toString,data(score).toInt,data(school).toString)) [error] /home/ubuntu/spark-1.0.0/external/jsonfile2/src/main/scala/jsonfile.scala:36: value toInt is not a member of Any I tried giving immutable.Map[Any,Int] and tried converting Int to string my application compiled but i am getting exception when i am running it 14/07/17 17:11:30 ERROR Executor: Exception in task ID 6 java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) Basically i am trying to do max operation in my sparksql. please let me know if their any work around solution for this. Thanks, -Srinivas. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p10060.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Is there a way to get previous/other keys' state in Spark Streaming?
For accessing previous version, I would do it the same way. :) 1. Can you elaborate on what you mean by that with an example? What do you mean by accessing keys? 2. Yeah, that is hard to do with the ability to do point lookups into an RDD, which we dont support yet. You could try embedding the related key in the values of the keys that need it. That is, B will is present in the value of key A. Then put this transformed DStream through updateStateByKey. TD
Re: Spark Streaming Json file groupby function
This is a basic scala problem. You cannot apply toInt to Any. Try doing toString.toInt For such scala issues, I recommend trying it out in the Scala shell. For example, you could have tried this out as the following. [tdas @ Xion streaming] scala Welcome to Scala version 2.10.3 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_45). Type in expressions to have them evaluated. Type :help for more information. scala 12.asInstanceOf[Any].toInt console:8: error: value toInt is not a member of Any 12.asInstanceOf[Any].toInt ^ scala 12.asInstanceOf[Any].toString.toInt res1: Int = 12 scala On Thu, Jul 17, 2014 at 10:32 AM, srinivas kusamsrini...@gmail.com wrote: hi TD, Thanks for the solutions for my previous post...I am running into other issue..i am getting data from json file and i am trying to parse it and trying to map it to a record given below val jsonf =lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[Any,Any]]).map(data=Record(data(ID).toString,data(name).toString,data(score).toInt,data(school).toString)) case class Record(ID:String,name:String,score:Int,school:String) when i am trying to do this i am getting an error [error] /home/ubuntu/spark-1.0.0/external/jsonfile2/src/main/scala/jsonfile.scala:36: value toInt is not a member of Any [error] lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[Any,Any]]).map(data=Record(data(ID).toString,data(name).toString,data(score).toInt,data(school).toString)) [error] /home/ubuntu/spark-1.0.0/external/jsonfile2/src/main/scala/jsonfile.scala:36: value toInt is not a member of Any I tried giving immutable.Map[Any,Int] and tried converting Int to string my application compiled but i am getting exception when i am running it 14/07/17 17:11:30 ERROR Executor: Exception in task ID 6 java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) Basically i am trying to do max operation in my sparksql. please let me know if their any work around solution for this. Thanks, -Srinivas. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p10060.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Error while running example/scala application using spark-submit
Thanks Sean, 1) Yes, I am trying to run locally without Hadoop. 2) I also see the error in the provided link while launching spark-shell but post launch I am able to execute same code I have in the sample application. Read any local file and perform some reduction operation. But not through submit command. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-running-example-scala-application-using-spark-submit-tp10056p10064.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming timing considerations
You have to define what is the range records that needs to be filtered out in every windowed RDD, right? For example, when the DStream.window has data from from times 0 - 8 seconds by DStream time, you only want to filter out data that falls into say 4 - 8 seconds by application time. This latter is the application-level time window that you need to define in the transform function. What may help is that there is another version of transform which allows you to get the current DStream time (that is, it will give the value 8) from which you can calculate the app-time-window 4 - 8. val transformed = keyAndValues.window(Seconds(8), Seconds(4)).transform((windowedRDD: RDD[...], dstreamTime: Time) = { val currentAppTimeWindowStart = dstreamTime - appTimeWindowSize // define the window over the timestamp that you want to process val currentAppTimeWindowEnd = dstreamTime val filteredRDD = windowedRDD.filter(r = r._1 = currentAppTimeWindowEnd r._1 currentAppTimeWindowStart) // filter and retain only the records that fall in the current app-time window return filteredRDD }) Hope this helps! TD On Thu, Jul 17, 2014 at 5:54 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi TD, I have been able to filter the first WindowedRDD, but I am not sure how to make a generic filter. The larger window is 8 seconds and want to fetch 4 second based on application-time-stamp. I have seen an earlier post which suggest timeStampBasedwindow but I am not sure how to make timestampBasedwindow in the following example. val transformed = keyAndValues.window(Seconds(8), Seconds(4)).transform(windowedRDD = { //val timeStampBasedWindow = ???// define the window over the timestamp that you want to process val filteredRDD = windowedRDD.filter(_._1 4) // filter and retain only the records that fall in the timestamp-based window return filteredRDD }) Consider the input tuples as (1,23),(1.2,34) . . . . . (3.8,54)(4,413) . . . whereas key is the timestamp. Regards, Laeeq On Saturday, July 12, 2014 8:29 PM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi, Thanks I will try to implement it. Regards, Laeeq On Saturday, July 12, 2014 4:37 AM, Tathagata Das tathagata.das1...@gmail.com wrote: This is not in the current streaming API. Queue stream is useful for testing with generated RDDs, but not for actual data. For actual data stream, the slack time can be implemented by doing DStream.window on a larger window that take slack time in consideration, and then the required application-time-based-window of data filtered out. For example, if you want a slack time of 1 minute and batches of 10 seconds, then do a window operation of 70 seconds, then in each RDD filter out the records with the desired application time and process them. TD On Fri, Jul 11, 2014 at 7:44 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi, In the spark streaming paper, slack time has been suggested for delaying the batch creation in case of external timestamps. I don't see any such option in streamingcontext. Is it available in the API? Also going through the previous posts, queueStream has been suggested for this. I looked into to queueStream example. // Create and push some RDDs into Queue for (i - 1 to 30) { rddQueue += ssc.sparkContext.makeRDD(1 to 10) Thread.sleep(1000) } The only thing I am unsure is how to make batches(basic RDD) out of stream coming on a port. Regards, Laeeq
Re: Error while running example/scala application using spark-submit
Hi Sean RE: Windows and hadoop 2.4.x HortonWorks - all the hype aside - only supports Windows Server 2008/2012. So this general concept of supporting Windows is bunk. Given that - and since the vast majority of Windows users do not happen to have Windows Server on their laptop - do you have any further insight into what it means to say that hadoop 2.4.x supports Windows ? Are you referring to cygwin support? 2014-07-17 11:13 GMT-07:00 Sean Owen so...@cloudera.com: I imagine the issue is ultimately combination of Windows and (stock?) Apache Hadoop. I know that in the past, operations like 'chmod' didn't work on Windows since it assumed the existence of POSIX binaries. That should be long since patched up for 2.4.x but there may be a gotcha here that others can comment on. Do I understand that you're trying to run entirely locally, without Hadoop at all? Then I think this sounds like https://issues.apache.org/jira/browse/SPARK-2356 which does deserve attention. The Hadoop APIs get tickled even when they're not used, and this can cause some initialization gotchas on Windows in particular. On Thu, Jul 17, 2014 at 6:16 PM, ShanxT mail4.shash...@gmail.com wrote: Hi, I am receiving below error while submitting any spark example or scala application. Really appreciate any help. spark version = 1.0.0 hadoop version = 2.4.0 Windows/Standalone mode 14/07/17 22:13:19 INFO TaskSchedulerImpl: Cancelling stage 0 Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0 failed 4 times, most recent failure: Exception failure in TID 6 o n host java.lang.NullPointerException java.lang.ProcessBuilder.start(ProcessBuilder.java:1012) org.apache.hadoop.util.Shell.runCommand(Shell.java:445) org.apache.hadoop.util.Shell.run(Shell.java:418) org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:650) org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:873) org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853) org.apache.spark.util.Utils$.fetchFile(Utils.scala:421) org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:332) org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) scala.collection.mutable.HashMap.foreach(HashMap.scala:98) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) org.apache.spark.executor.Executor.org $apache$spark$executor$Executor$$updateDependencies(Executor.scala:330) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:168) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at
Re: GraphX Pragel implementation
If your sendMsg function needs to know the incoming messages as well as the vertex value, you could define VD to be a tuple of the vertex value and the last received message. The vprog function would then store the incoming messages into the tuple, allowing sendMsg to access them. For example, if the vertex value was a String and the message type was an Int, you could call Pregel as follows: val graph: Graph[String, _] = ... graph.mapVertices((id, attr) = (attr, 0)).pregel(0)( (id, attr: (String, Int), msg: Int) = (attr._1, msg), edge = Iterator(...), // can use edge.srcAttr._2 and edge.dstAttr._2 to access the messages (a: Int, b: Int) = a + b) Ankur http://www.ankurdave.com/
Custom Metrics Sink
What is the preferred way of adding a custom metrics sink to Spark? I noticed that the Sink Trait has been private since April, so I cannot simply extend Sink in an outside package, but I would like to avoid having to create a custom build of Spark. Is this possible? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Metrics-Sink-tp10068.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Error while running example/scala application using spark-submit
I am probably the wrong person to ask as I never use Hadoop on Windows. But from looking at the code just now it is clearly trying to accommodate Windows shell commands. Yes I would not be surprised if it still needs Cygwin. A slightly broader point is that ideally it doesnt matter whether Hadoop works on your platform if using Spark locally without Hadoop. I don't know how feasible it is to separate but there may be some tweaks to avoid initializing Hadoop in more cases. See the JIRA. On Jul 17, 2014 7:52 PM, Stephen Boesch java...@gmail.com wrote: Hi Sean RE: Windows and hadoop 2.4.x HortonWorks - all the hype aside - only supports Windows Server 2008/2012. So this general concept of supporting Windows is bunk. Given that - and since the vast majority of Windows users do not happen to have Windows Server on their laptop - do you have any further insight into what it means to say that hadoop 2.4.x supports Windows ? Are you referring to cygwin support? 2014-07-17 11:13 GMT-07:00 Sean Owen so...@cloudera.com: I imagine the issue is ultimately combination of Windows and (stock?) Apache Hadoop. I know that in the past, operations like 'chmod' didn't work on Windows since it assumed the existence of POSIX binaries. That should be long since patched up for 2.4.x but there may be a gotcha here that others can comment on. Do I understand that you're trying to run entirely locally, without Hadoop at all? Then I think this sounds like https://issues.apache.org/jira/browse/SPARK-2356 which does deserve attention. The Hadoop APIs get tickled even when they're not used, and this can cause some initialization gotchas on Windows in particular. On Thu, Jul 17, 2014 at 6:16 PM, ShanxT mail4.shash...@gmail.com wrote: Hi, I am receiving below error while submitting any spark example or scala application. Really appreciate any help. spark version = 1.0.0 hadoop version = 2.4.0 Windows/Standalone mode 14/07/17 22:13:19 INFO TaskSchedulerImpl: Cancelling stage 0 Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0 failed 4 times, most recent failure: Exception failure in TID 6 o n host java.lang.NullPointerException java.lang.ProcessBuilder.start(ProcessBuilder.java:1012) org.apache.hadoop.util.Shell.runCommand(Shell.java:445) org.apache.hadoop.util.Shell.run(Shell.java:418) org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:650) org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:873) org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853) org.apache.spark.util.Utils$.fetchFile(Utils.scala:421) org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:332) org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) scala.collection.mutable.HashMap.foreach(HashMap.scala:98) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) org.apache.spark.executor.Executor.org $apache$spark$executor$Executor$$updateDependencies(Executor.scala:330) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:168) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at
Re: Is there a way to get previous/other keys' state in Spark Streaming?
Hi TD, Thank you for the quick replying and backing my approach. :) 1) The example is this: 1. In the first 2 second interval, after updateStateByKey, I get a few keys and their states, say, (a - 1, b - 2, c - 3) 2. In the following 2 second interval, I only receive c and d and their value. But I want to update/display the state of a and b accordingly. * It seems I have no way to access the a and b and get their states. * also, do I have a way to show all the existing states? I guess the approach to solve this will be similar to what you mentioned for 2). But the difficulty is that, if I want to display all the existing states, need to bundle all the rest keys to one key. Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 11:36 AM, Tathagata Das tathagata.das1...@gmail.com wrote: For accessing previous version, I would do it the same way. :) 1. Can you elaborate on what you mean by that with an example? What do you mean by accessing keys? 2. Yeah, that is hard to do with the ability to do point lookups into an RDD, which we dont support yet. You could try embedding the related key in the values of the keys that need it. That is, B will is present in the value of key A. Then put this transformed DStream through updateStateByKey. TD
Re: Difference among batchDuration, windowDuration, slideDuration
Thanks Tathagata, so can I say RDD size(from the stream) is window size. and the overlap between 2 adjacent RDDs are sliding size. But I still don't understand what it batch size, why do we need this since data processing is RDD by RDD right? And does spark chop the data into RDDs at the very beginning? Do you allow event by event processing, for example filtering On Wed, Jul 16, 2014 at 6:47 PM, Tathagata Das tathagata.das1...@gmail.com wrote: I guess this is better explained in the streaming programming guide's http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations window operation subsection. For completeness sake, its worth mentioning the following. Window operations can be applied on other windowed-DStreams as well. So the correct thing to say is that the slide duration of the window operations must be a multiple of sliding interval of the parent DStream. For simple, non-window dstream, this sliding interval is same as the batch interval // say batch interval is 2 seconds inputstream// moves every batch interval 2 seconds inputstream.window(Seconds(3)) // not allowed, must be multiple of 2 seconds inputstream.window(Seconds(4)) // allowed, moves every 2 seconds (therefore sliding interval is 2 seconds) inputstream.window(Seconds(10), Seconds(4))// allowed, moves every 4 seconds (therefore sliding interval is 4 seconds) inputstream.window(Seconds(10), Seconds(4)).window(Seconds(6))// not allowed, as window interval must be multiple of parent's sliding interval which is 4 seconds inputstream.window(Seconds(10), Seconds(4)).window(Seconds(8))// allowed Hopefully that made sense :) TD On Wed, Jul 16, 2014 at 12:41 PM, Walrus theCat walrusthe...@gmail.com wrote: I did not! On Wed, Jul 16, 2014 at 12:31 PM, aaronjosephs aa...@placeiq.com wrote: The only other thing to keep in mind is that window duration and slide duration have to be multiples of batch duration, IDK if you made that fully clear -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Difference-among-batchDuration-windowDuration-slideDuration-tp9966p9973.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Error: No space left on device
Hi Xiangrui, Thanks. I have taken your advice and set all 5 of my slaves to be c3.4xlarge. In this case /mnt and /mnt2 have plenty of space by default. I now do sc.textFile(blah).repartition(N).map(...).cache() with N=80 and spark.executor.memory to be 20gb and --driver-memory 20g. So far things seem more stable. Thanks for the help, Chris On Thu, Jul 17, 2014 at 12:56 AM, Xiangrui Meng men...@gmail.com wrote: Set N be the total number of cores on the cluster or less. sc.textFile doesn't always give you that number, depends on the block size. For MovieLens, I think the default behavior should be 2~3 partitions. You need to call repartition to ensure the right number of partitions. Which EC2 instance type did you use? I usually use m3.2xlarge or c? instances that come with SSD and 1G or 10G network. For those instances, you should see local drives mounted at /mnt, /mnt2, /mnt3, ... Make sure there was no error when you used the ec2 script to launch the cluster. It is a little strange to see 94% of / was used on a slave. Maybe shuffle data went to /. I'm not sure which settings went wrong. I recommend trying re-launching a cluster with m3.2xlarge instances and using the default settings (don't set anything in SparkConf). Submit the application with --driver-memory 20g. The running times are slower than what I remember, but it depends on the instance type. Best, Xiangrui On Wed, Jul 16, 2014 at 10:18 PM, Chris DuBois chris.dub...@gmail.com wrote: Hi Xiangrui, I will try this shortly. When using N partitions, do you recommend N be the number of cores on each slave or the number of cores on the master? Forgive my ignorance, but is this best achieved as an argument to sc.textFile? The slaves on the EC2 clusters start with only 8gb of storage, and it doesn't seem that /mnt/spark and /mnt2/spark are mounted anywhere else by default. Looking at spark-ec2/setup-slaves.sh, it appears that these are only mounted if the instance type begins with r3. (Or am I not reading that right?) My slaves are a different instance type, and currently look like this: FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 7.3G 515M 94% / tmpfs 7.4G 4.0K 7.4G 1% /dev/shm /dev/xvdv 500G 2.5G 498G 1% /vol I have been able to finish ALS on MovieLens 10M only twice, taking 221s and 315s for 20 iterations at K=20 and lambda=0.01. Does that timing sound about right, or does it point to a poor configuration? The same script with MovieLens 1M runs fine in about 30-40s with the same settings. (In both cases I'm training on 70% of the data.) Thanks for your help! Chris On Wed, Jul 16, 2014 at 4:29 PM, Xiangrui Meng men...@gmail.com wrote: For ALS, I would recommend repartitioning the ratings to match the number of CPU cores or even less. ALS is not computation heavy for small k but communication heavy. Having small number of partitions may help. For EC2 clusters, we use /mnt/spark and /mnt2/spark as the default local directory because they are local hard drives. Did your last run of ALS on MovieLens 10M-100K with the default settings succeed? -Xiangrui On Wed, Jul 16, 2014 at 8:00 AM, Chris DuBois chris.dub...@gmail.com wrote: Hi Xiangrui, I accidentally did not send df -i for the master node. Here it is at the moment of failure: FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 280938 243350 54% / tmpfs3845409 1 38454081% /dev/shm /dev/xvdb100024321027 100014051% /mnt /dev/xvdf10002432 16 100024161% /mnt2 /dev/xvdv524288000 13 5242879871% /vol I am using default settings now, but is there a way to make sure that the proper directories are being used? How many blocks/partitions do you recommend? Chris On Wed, Jul 16, 2014 at 1:09 AM, Chris DuBois chris.dub...@gmail.com wrote: Hi Xiangrui, Here is the result on the master node: $ df -i FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 273997 250291 53% / tmpfs1917974 1 19179731% /dev/shm /dev/xvdv524288000 30 5242879701% /vol I have reproduced the error while using the MovieLens 10M data set on a newly created cluster. Thanks for the help. Chris On Wed, Jul 16, 2014 at 12:22 AM, Xiangrui Meng men...@gmail.com wrote: Hi Chris, Could you also try `df -i` on the master node? How many blocks/partitions did you set? In the current implementation, ALS doesn't clean the shuffle data because the operations are chained together. But it shouldn't run out of disk space on the MovieLens dataset, which is small. spark-ec2 script sets /mnt/spark and
Re: using multiple dstreams together (spark streaming)
Thanks! On Wed, Jul 16, 2014 at 6:34 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Have you taken a look at DStream.transformWith( ... ) . That allows you apply arbitrary transformation between RDDs (of the same timestamp) of two different streams. So you can do something like this. 2s-window-stream.transformWith(1s-window-stream, (rdd1: RDD[...], rdd2: RDD[...]) = { ... // return a new RDD }) And streamingContext.transform() extends it to N DStreams. :) Hope this helps! TD On Wed, Jul 16, 2014 at 10:42 AM, Walrus theCat walrusthe...@gmail.com wrote: hey at least it's something (thanks!) ... not sure what i'm going to do if i can't find a solution (other than not use spark) as i really need these capabilities. anyone got anything else? On Wed, Jul 16, 2014 at 10:34 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: hum... maybe consuming all streams at the same time with an actor that would act as a new DStream source... but this is just a random idea... I don't really know if that would be a good idea or even possible. 2014-07-16 18:30 GMT+01:00 Walrus theCat walrusthe...@gmail.com: Yeah -- I tried the .union operation and it didn't work for that reason. Surely there has to be a way to do this, as I imagine this is a commonly desired goal in streaming applications? On Wed, Jul 16, 2014 at 10:10 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: I'm joining several kafka dstreams using the join operation but you have the limitation that the duration of the batch has to be same,i.e. 1 second window for all dstreams... so it would not work for you. 2014-07-16 18:08 GMT+01:00 Walrus theCat walrusthe...@gmail.com: Hi, My application has multiple dstreams on the same inputstream: dstream1 // 1 second window dstream2 // 2 second window dstream3 // 5 minute window I want to write logic that deals with all three windows (e.g. when the 1 second window differs from the 2 second window by some delta ...) I've found some examples online (there's not much out there!), and I can only see people transforming a single dstream. In conventional spark, we'd do this sort of thing with a cartesian on RDDs. How can I deal with multiple Dstreams at once? Thanks
Re: Supported SQL syntax in Spark SQL
FYI: I've created SPARK-2560 https://issues.apache.org/jira/browse/SPARK-2560 to track creating SQL reference docs for Spark SQL. On Mon, Jul 14, 2014 at 2:06 PM, Michael Armbrust mich...@databricks.com wrote: You can find the parser here: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala In general the hive parser provided by HQL is much more complete at the moment. Long term we will likely stop using parser combinators and either write a more complete parser, or adopt one from an existing project. On Mon, Jul 14, 2014 at 12:25 AM, Martin Gammelsæter martingammelsae...@gmail.com wrote: I am very interested in the original question as well, is there any list (even if it is simply in the code) of all supported syntax for Spark SQL? On Mon, Jul 14, 2014 at 6:41 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Are you sure the code running on the cluster has been updated? I launched the cluster using spark-ec2 from the 1.0.1 release, so I’m assuming that’s taken care of, at least in theory. I just spun down the clusters I had up, but I will revisit this tomorrow and provide the information you requested. Nick -- Mvh. Martin Gammelsæter 92209139
replacement for SPARK_LIBRARY_PATH ?
I used to use SPARK_LIBRARY_PATH to specify the location of native libs for lzo compression when using spark 0.9.0. The references to that environment variable have disappeared from the docs for spark 1.0.1 and it's not clear how to specify the location for lzo. Any guidance?
Re: Error: No space left on device
Hi, I also have some issues with repartition. In my program, I consume data from Kafka. After I consume data, I use repartition(N). However, although I set N to be 120, there are around 18 executors allocated for my reduce stage. I am not sure how the repartition command works ton ensure the parallelism. Bill On Thu, Jul 17, 2014 at 12:56 AM, Xiangrui Meng men...@gmail.com wrote: Set N be the total number of cores on the cluster or less. sc.textFile doesn't always give you that number, depends on the block size. For MovieLens, I think the default behavior should be 2~3 partitions. You need to call repartition to ensure the right number of partitions. Which EC2 instance type did you use? I usually use m3.2xlarge or c? instances that come with SSD and 1G or 10G network. For those instances, you should see local drives mounted at /mnt, /mnt2, /mnt3, ... Make sure there was no error when you used the ec2 script to launch the cluster. It is a little strange to see 94% of / was used on a slave. Maybe shuffle data went to /. I'm not sure which settings went wrong. I recommend trying re-launching a cluster with m3.2xlarge instances and using the default settings (don't set anything in SparkConf). Submit the application with --driver-memory 20g. The running times are slower than what I remember, but it depends on the instance type. Best, Xiangrui On Wed, Jul 16, 2014 at 10:18 PM, Chris DuBois chris.dub...@gmail.com wrote: Hi Xiangrui, I will try this shortly. When using N partitions, do you recommend N be the number of cores on each slave or the number of cores on the master? Forgive my ignorance, but is this best achieved as an argument to sc.textFile? The slaves on the EC2 clusters start with only 8gb of storage, and it doesn't seem that /mnt/spark and /mnt2/spark are mounted anywhere else by default. Looking at spark-ec2/setup-slaves.sh, it appears that these are only mounted if the instance type begins with r3. (Or am I not reading that right?) My slaves are a different instance type, and currently look like this: FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 7.3G 515M 94% / tmpfs 7.4G 4.0K 7.4G 1% /dev/shm /dev/xvdv 500G 2.5G 498G 1% /vol I have been able to finish ALS on MovieLens 10M only twice, taking 221s and 315s for 20 iterations at K=20 and lambda=0.01. Does that timing sound about right, or does it point to a poor configuration? The same script with MovieLens 1M runs fine in about 30-40s with the same settings. (In both cases I'm training on 70% of the data.) Thanks for your help! Chris On Wed, Jul 16, 2014 at 4:29 PM, Xiangrui Meng men...@gmail.com wrote: For ALS, I would recommend repartitioning the ratings to match the number of CPU cores or even less. ALS is not computation heavy for small k but communication heavy. Having small number of partitions may help. For EC2 clusters, we use /mnt/spark and /mnt2/spark as the default local directory because they are local hard drives. Did your last run of ALS on MovieLens 10M-100K with the default settings succeed? -Xiangrui On Wed, Jul 16, 2014 at 8:00 AM, Chris DuBois chris.dub...@gmail.com wrote: Hi Xiangrui, I accidentally did not send df -i for the master node. Here it is at the moment of failure: FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 280938 243350 54% / tmpfs3845409 1 38454081% /dev/shm /dev/xvdb100024321027 100014051% /mnt /dev/xvdf10002432 16 100024161% /mnt2 /dev/xvdv524288000 13 5242879871% /vol I am using default settings now, but is there a way to make sure that the proper directories are being used? How many blocks/partitions do you recommend? Chris On Wed, Jul 16, 2014 at 1:09 AM, Chris DuBois chris.dub...@gmail.com wrote: Hi Xiangrui, Here is the result on the master node: $ df -i FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 273997 250291 53% / tmpfs1917974 1 19179731% /dev/shm /dev/xvdv524288000 30 5242879701% /vol I have reproduced the error while using the MovieLens 10M data set on a newly created cluster. Thanks for the help. Chris On Wed, Jul 16, 2014 at 12:22 AM, Xiangrui Meng men...@gmail.com wrote: Hi Chris, Could you also try `df -i` on the master node? How many blocks/partitions did you set? In the current implementation, ALS doesn't clean the shuffle data because the operations are chained together. But it shouldn't run out of disk space on the MovieLens dataset, which is small. spark-ec2 script sets /mnt/spark and /mnt/spark2 as the local.dir by
Re: can't get jobs to run on cluster (enough memory and cpus are available on worker)
Thanks Marcelo! This is a huge help!! Looking at the executor logs (in a vanilla spark install, I'm finding them in $SPARK_HOME/work/*)... It launches the executor, but it looks like the CoarseGrainedExecutorBackend is having trouble talking to the driver (exactly what you said!!!). Do you know what the range of random ports that is used for the the executor-to-driver? Is that range adjustable? Any config setting or environment variable? I manually setup my ec2 security group to include all the ports that the spark ec2 script ($SPARK_HOME/ec2/spark_ec2.py) sets up in it's security groups. They included (for those listed above 1): 1 50060 50070 50075 60060 60070 60075 Obviously I'll need to make some adjustments to my EC2 security group! Just need to figure out exactly what should be in there. To keep things simple, I just have one security group for the master, slaves, and the driver machine. In listing the port ranges in my current security group I looked at the ports that spark_ec2.py sets up as well as the ports listed in the spark standalone mode documentation page under configuring ports for network security: http://spark.apache.org/docs/latest/spark-standalone.html Here are the relevant fragments from the executor log: Spark Executor Command: /cask/jdk/bin/java -cp ::/cask/spark/conf:/cask/spark/lib/spark-assembly-1.0.0-hadoop2.2.0.jar:/cask/spark/lib/datanucleus-api-jdo-3. 2.1.jar:/cask/spark/lib/datanucleus-rdbms-3.2.1.jar:/cask/spark/lib/datanucleus-core-3.2.2.jar -XX:MaxPermSize=128m -Dspark.akka.frameSize=100 -Dspark.akka. frameSize=100 -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@ip-10-202-11-191.ec2.internal:46787/user/CoarseGra inedScheduler 0 ip-10-202-8-45.ec2.internal 8 akka.tcp://sparkWorker@ip-10-202-8-45.ec2.internal:7101/user/Worker app-20140717195146- ... 14/07/17 19:51:47 DEBUG NativeCodeLoader: Trying to load the custom-built native-hadoop library... 14/07/17 19:51:47 DEBUG NativeCodeLoader: Failed to load native-hadoop with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path 14/07/17 19:51:47 DEBUG NativeCodeLoader: java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib 14/07/17 19:51:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/17 19:51:47 DEBUG JniBasedUnixGroupsMappingWithFallback: Falling back to shell based 14/07/17 19:51:47 DEBUG JniBasedUnixGroupsMappingWithFallback: Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping 14/07/17 19:51:48 DEBUG Groups: Group mapping impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback; cacheTimeout=30 14/07/17 19:51:48 DEBUG SparkHadoopUtil: running as user: ec2-user ... 14/07/17 19:51:48 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@ip-10-202-11-191.ec2.internal :46787/user/CoarseGrainedScheduler 14/07/17 19:51:48 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@ip-10-202-8-45.ec2.internal:7101/user/Worker 14/07/17 19:51:49 INFO WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@ip-10-202-8-45.ec2.internal:7101/user/Worker 14/07/17 19:53:29 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@ip-10-202-8-45.ec2.internal:55670] - [akka.tcp://spark@ip-10-202-11-191.ec2.internal:46787] disassociated! Shutting down. Thanks a bunch! Matt On Thu, Jul 17, 2014 at 1:21 PM, Marcelo Vanzin van...@cloudera.com wrote: When I meant the executor log, I meant the log of the process launched by the worker, not the worker. In my CDH-based Spark install, those end up in /var/run/spark/work. If you look at your worker log, you'll see it's launching the executor process. So there should be something there. Since you say it works when both are run in the same node, that probably points to some communication issue, since the executor needs to connect back to the driver. Check to see if you don't have any firewalls blocking the ports Spark tries to use. (That's one of the non-resource-related cases that will cause that message.)
Re: replacement for SPARK_LIBRARY_PATH ?
One way is to set this in your conf/spark-defaults.conf: spark.executor.extraLibraryPath /path/to/native/lib The key is documented here: http://spark.apache.org/docs/latest/configuration.html On Thu, Jul 17, 2014 at 1:25 PM, Eric Friedman eric.d.fried...@gmail.com wrote: I used to use SPARK_LIBRARY_PATH to specify the location of native libs for lzo compression when using spark 0.9.0. The references to that environment variable have disappeared from the docs for spark 1.0.1 and it's not clear how to specify the location for lzo. Any guidance?
unserializable object in Spark Streaming context
Hi guys, need some help in this problem. In our use case, we need to continuously insert values into the database. So our approach is to create the jdbc object in the main method and then do the inserting operation in the DStream foreachRDD operation. Is this approach reasonable? Then the problem comes: since we are using com.mysql.jdbc.java, which is unserializable, we keep seeing the notSerializableException. I think that is because Spark Streaming is trying to serialize and then checkpoint the whole class which contains the StreamingContext, not only the StreamingContext object, right? Or other reason to trigger the serialize operation? Any workaround for this? (except not using the com.mysql.jdbc.java) Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108
Re: unserializable object in Spark Streaming context
Could you share some code (or pseudo-code)? Sounds like you're instantiating the JDBC connection in the driver, and using it inside a closure that would be run in a remote executor. That means that the connection object would need to be serializable. If that sounds like what you're doing, it won't work. On Thu, Jul 17, 2014 at 1:37 PM, Yan Fang yanfang...@gmail.com wrote: Hi guys, need some help in this problem. In our use case, we need to continuously insert values into the database. So our approach is to create the jdbc object in the main method and then do the inserting operation in the DStream foreachRDD operation. Is this approach reasonable? Then the problem comes: since we are using com.mysql.jdbc.java, which is unserializable, we keep seeing the notSerializableException. I think that is because Spark Streaming is trying to serialize and then checkpoint the whole class which contains the StreamingContext, not only the StreamingContext object, right? Or other reason to trigger the serialize operation? Any workaround for this? (except not using the com.mysql.jdbc.java) Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 -- Marcelo
Re: Include permalinks in mail footer
Good question.. I'll ask INFRA because I haven't seen other Apache mailing lists provide this. It would indeed be helpful. Matei On Jul 17, 2014, at 12:59 PM, Nick Chammas nicholas.cham...@gmail.com wrote: Can we modify the mailing list to include permalinks to the thread in the footer of every email? Or at least of the initial email in a thread? I often find myself wanting to reference one thread from another, or from a JIRA issue. Right now I have to google the thread subject and find the link that way. It would be nice to be able to find the permalink I need from the thread itself. It might also be helpful for people to include an unsubscribe link in the footer. That is a common practice in most mailing lists. Nick View this message in context: Include permalinks in mail footer Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Is there a way to get previous/other keys' state in Spark Streaming?
The updateFunction given in updateStateByKey should be called on ALL the keys are in the state, even if there is no new data in the batch for some key. Is that not the behavior you see? What do you mean by show all the existing states? You have access to the latest state RDD by doing stateStream.foreachRDD(...). There you can do whatever operation on all the key-state pairs. TD On Thu, Jul 17, 2014 at 11:58 AM, Yan Fang yanfang...@gmail.com wrote: Hi TD, Thank you for the quick replying and backing my approach. :) 1) The example is this: 1. In the first 2 second interval, after updateStateByKey, I get a few keys and their states, say, (a - 1, b - 2, c - 3) 2. In the following 2 second interval, I only receive c and d and their value. But I want to update/display the state of a and b accordingly. * It seems I have no way to access the a and b and get their states. * also, do I have a way to show all the existing states? I guess the approach to solve this will be similar to what you mentioned for 2). But the difficulty is that, if I want to display all the existing states, need to bundle all the rest keys to one key. Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 11:36 AM, Tathagata Das tathagata.das1...@gmail.com wrote: For accessing previous version, I would do it the same way. :) 1. Can you elaborate on what you mean by that with an example? What do you mean by accessing keys? 2. Yeah, that is hard to do with the ability to do point lookups into an RDD, which we dont support yet. You could try embedding the related key in the values of the keys that need it. That is, B will is present in the value of key A. Then put this transformed DStream through updateStateByKey. TD
Re: unserializable object in Spark Streaming context
And if Marcelo's guess is correct, then the right way to do this would be to lazily / dynamically create the jdbc connection server as a singleton in the workers/executors and use that. Something like this. dstream.foreachRDD(rdd = { rdd.foreachPartition((iterator: Iterator[...]) = { val driver = JDBCDriver.getSingleton() // this will create the single jdbc server in the worker, if it does not exist // loop through iterator to get the records in the partition and use the driver to push them out to the DB } } This will avoid the JDBC server being serialized as part of the closure / DStream checkpoint. TD On Thu, Jul 17, 2014 at 1:42 PM, Marcelo Vanzin van...@cloudera.com wrote: Could you share some code (or pseudo-code)? Sounds like you're instantiating the JDBC connection in the driver, and using it inside a closure that would be run in a remote executor. That means that the connection object would need to be serializable. If that sounds like what you're doing, it won't work. On Thu, Jul 17, 2014 at 1:37 PM, Yan Fang yanfang...@gmail.com wrote: Hi guys, need some help in this problem. In our use case, we need to continuously insert values into the database. So our approach is to create the jdbc object in the main method and then do the inserting operation in the DStream foreachRDD operation. Is this approach reasonable? Then the problem comes: since we are using com.mysql.jdbc.java, which is unserializable, we keep seeing the notSerializableException. I think that is because Spark Streaming is trying to serialize and then checkpoint the whole class which contains the StreamingContext, not only the StreamingContext object, right? Or other reason to trigger the serialize operation? Any workaround for this? (except not using the com.mysql.jdbc.java) Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 -- Marcelo
Re: Spark Streaming timestamps
Hi Tathagata, Thanks for your answer. Please see my further question below: On Wed, Jul 16, 2014 at 6:57 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Answers inline. On Wed, Jul 16, 2014 at 5:39 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am currently using Spark Streaming to conduct a real-time data analytics. We receive data from Kafka. We want to generate output files that contain results that are based on the data we receive from a specific time interval. I have several questions on Spark Streaming's timestamp: 1) If I use saveAsTextFiles, it seems Spark streaming will generate files in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix time), etc. Does this mean the results are based on the data from 5:00:01 to 5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time the files are generated? File named 5:00:01 contains results from data received between 5:00:00 and 5:00:01 (based on system time of the cluster). 2) If I do not use saveAsTextFiles, how do I get the exact time interval of the RDD when I use foreachRDD to do custom output of the results? There is a version of foreachRDD which allows you specify the function that takes in Time object. 3) How can we specify the starting time of the batches? What do you mean? Batches are timed based on the system time of the cluster. I would like to control the starting time and ending time of each batch. For example, if I use saveAsTextFiles as output method and the batch size is 1 minute, Spark will align time intervals to complete minutes, such as 5:01:00, 5:02:00, 5:03:00. It will have not results that are 5:01:03, 5:02:03, 5:03:03, etc. My goal is to generate output for a customized interval such as from 5:01:30 to 5:02:29, 5:02:30 to 5:03:29, etc. I checked the api of foreachRDD with time parameter. It seems there is not explanation on what does that parameter mean. Does it mean the starting time of the first batch? Thanks! Bill
how to pass extra Java opts to workers for spark streaming jobs
I am using spark 0.9.0 and I am able to submit job to YARN, https://spark.apache.org/docs/0.9.0/running-on-yarn.html. I am trying to turn on gc logging on executors but could not find a way to set extra Java opts for workers. I tried to set spark.executor.extraJavaOptions but that did not work. Any idea on how I should do this? -- Chen Song
Re: spark streaming rate limiting from kafka
Thanks Luis and Tobias. On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com wrote: * Is there a way to control how far Kafka Dstream can read on topic-partition (via offset for example). By setting this to a small number, it will force DStream to read less data initially. Please see the post at http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E Kafka's auto.offset.reset parameter may be what you are looking for. Tobias -- Chen Song
Re: Apache kafka + spark + Parquet
val kafkaStream = KafkaUtils.createStream(... ) // see the example in my previous post val transformedStream = kafkaStream.map ... // whatever transformation you want to do transformedStream.foreachRDD((rdd: RDD[...], time: Time) = { // save the rdd to parquet file, using time as the file name, see other link i sent in how to do it // every batch of data will create a new parquet file }) Maybe michael (cc'ed) will be able to give more insights about the parquet stuff. TD On Thu, Jul 17, 2014 at 3:59 AM, Mahebub Sayyed mahebub...@gmail.com wrote: Hi, To migrate data from *HBase *to *Parquet* we used following query through * Impala*: INSERT INTO table PARQUET_HASHTAGS( key, city_name, country_name, hashtag_date, hashtag_text, hashtag_source, hashtag_month, posted_time, hashtag_time, tweet_id, user_id, user_name, hashtag_year ) *partition(year, month, day)* SELECT key, city_name, country_name, hashtag_date, hashtag_text, hashtag_source, hashtag_month, posted_time, hashtag_time, tweet_id, user_id, user_name,hashtag_year, cast(hashtag_year as int),cast(hashtag_month as int), cast(hashtag_date as int) from HASHTAGS where hashtag_year='2014' and hashtag_month='04' and hashtag_date='01' ORDER BY key,hashtag_year,hashtag_month,hashtag_date LIMIT 1000 offset 0; using above query we have successfully migrated form HBase to Parquet files with proper partitions. Now we are storing Data direct from *Kafka *to *Parquet.* *How is it possible to create partitions while storing data direct from kafka to Parquet files??* *(likewise created in above query)* On Thu, Jul 17, 2014 at 12:35 PM, Tathagata Das tathagata.das1...@gmail.com wrote: 1. You can put in multiple kafka topics in the same Kafka input stream. See the example KafkaWordCount https://github.com/apache/spark/blob/68f28dabe9c7679be82e684385be216319beb610/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala . However they will all be read through a single receiver (though multiple threads, one per topic). To parallelize the read (for increasing throughput), you can create multiple Kafka input streams, and splits the topics appropriately between them. 2. You can easily read and write to parquet files in Spark. Any RDD (generated through DStreams in Spark Streaming, or otherwise), can be converted to a SchemaRDD and then saved in the parquet format as rdd.saveAsParquetFile. See the Spark SQL guide http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files for more details. So if you want to write a same dataset (as RDDs) to two different parquet files, you just have to call saveAsParquetFile twice (on same or transformed versions of the RDD), as shown in the guide. Hope this helps! TD On Thu, Jul 17, 2014 at 2:19 AM, Mahebub Sayyed mahebub...@gmail.com wrote: Hi All, Currently we are reading (multiple) topics from Apache kafka and storing that in HBase (multiple tables) using twitter storm (1 tuple stores in 4 different tables). but we are facing some performance issue with HBase. so we are replacing* HBase* with *Parquet* file and *storm* with *Apache Spark*. difficulties: 1. How to read multiple topics from kafka using spark? 2. One tuple belongs to multiple tables, How to write one topic to multiple parquet files with proper partitioning using spark?? Please help me Thanks in advance. -- *Regards,* *Mahebub * -- *Regards,* *Mahebub Sayyed*
Re: can't get jobs to run on cluster (enough memory and cpus are available on worker)
Hi Matt, I'm not very familiar with setup on ec2; the closest I can point you at is to look at the launch_cluster in ec2/spark_ec2.py, where the ports seem to be configured. On Thu, Jul 17, 2014 at 1:29 PM, Matt Work Coarr mattcoarr.w...@gmail.com wrote: Thanks Marcelo! This is a huge help!! Looking at the executor logs (in a vanilla spark install, I'm finding them in $SPARK_HOME/work/*)... It launches the executor, but it looks like the CoarseGrainedExecutorBackend is having trouble talking to the driver (exactly what you said!!!). Do you know what the range of random ports that is used for the the executor-to-driver? Is that range adjustable? Any config setting or environment variable? I manually setup my ec2 security group to include all the ports that the spark ec2 script ($SPARK_HOME/ec2/spark_ec2.py) sets up in it's security groups. They included (for those listed above 1): 1 50060 50070 50075 60060 60070 60075 Obviously I'll need to make some adjustments to my EC2 security group! Just need to figure out exactly what should be in there. To keep things simple, I just have one security group for the master, slaves, and the driver machine. In listing the port ranges in my current security group I looked at the ports that spark_ec2.py sets up as well as the ports listed in the spark standalone mode documentation page under configuring ports for network security: http://spark.apache.org/docs/latest/spark-standalone.html Here are the relevant fragments from the executor log: Spark Executor Command: /cask/jdk/bin/java -cp ::/cask/spark/conf:/cask/spark/lib/spark-assembly-1.0.0-hadoop2.2.0.jar:/cask/spark/lib/datanucleus-api-jdo-3. 2.1.jar:/cask/spark/lib/datanucleus-rdbms-3.2.1.jar:/cask/spark/lib/datanucleus-core-3.2.2.jar -XX:MaxPermSize=128m -Dspark.akka.frameSize=100 -Dspark.akka. frameSize=100 -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@ip-10-202-11-191.ec2.internal:46787/user/CoarseGra inedScheduler 0 ip-10-202-8-45.ec2.internal 8 akka.tcp://sparkWorker@ip-10-202-8-45.ec2.internal:7101/user/Worker app-20140717195146- ... 14/07/17 19:51:47 DEBUG NativeCodeLoader: Trying to load the custom-built native-hadoop library... 14/07/17 19:51:47 DEBUG NativeCodeLoader: Failed to load native-hadoop with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path 14/07/17 19:51:47 DEBUG NativeCodeLoader: java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib 14/07/17 19:51:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/17 19:51:47 DEBUG JniBasedUnixGroupsMappingWithFallback: Falling back to shell based 14/07/17 19:51:47 DEBUG JniBasedUnixGroupsMappingWithFallback: Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping 14/07/17 19:51:48 DEBUG Groups: Group mapping impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback; cacheTimeout=30 14/07/17 19:51:48 DEBUG SparkHadoopUtil: running as user: ec2-user ... 14/07/17 19:51:48 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@ip-10-202-11-191.ec2.internal:46787/user/CoarseGrainedScheduler 14/07/17 19:51:48 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@ip-10-202-8-45.ec2.internal:7101/user/Worker 14/07/17 19:51:49 INFO WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@ip-10-202-8-45.ec2.internal:7101/user/Worker 14/07/17 19:53:29 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@ip-10-202-8-45.ec2.internal:55670] - [akka.tcp://spark@ip-10-202-11-191.ec2.internal:46787] disassociated! Shutting down. Thanks a bunch! Matt On Thu, Jul 17, 2014 at 1:21 PM, Marcelo Vanzin van...@cloudera.com wrote: When I meant the executor log, I meant the log of the process launched by the worker, not the worker. In my CDH-based Spark install, those end up in /var/run/spark/work. If you look at your worker log, you'll see it's launching the executor process. So there should be something there. Since you say it works when both are run in the same node, that probably points to some communication issue, since the executor needs to connect back to the driver. Check to see if you don't have any firewalls blocking the ports Spark tries to use. (That's one of the non-resource-related cases that will cause that message.) -- Marcelo
Re: unserializable object in Spark Streaming context
Hi Marcelo and TD, Thank you for the help. If I use TD's approache, it works and there is no exception. Only drawback is that it will create many connections to the DB, which I was trying to avoid. Here is a snapshot of my code. Mark as red for the important code. What I was thinking is that, if I call the collect() method, Spark Streaming will bring the data to the driver and then the db object does not need to be sent to executors. My observation is that, thought exceptions are thrown, the insert function still works. Any thought about that? Also paste the log in case it helps .http://pastebin.com/T1bYvLWB == code == SparkConf sparkConf = new SparkConf().setAppName(balababala); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); final MySQLHelper db = new MySQLHelper(); // this class contain instantiate the jdbc driver. /** /* a few DStream transformations **/ JavaPairDStreamString, MachineState noiseState = machineIdNoise .updateStateByKey(getUpdateFunction()); JavaPairDStreamString, Tuple2MachineState, Integer noiseStateTemperature = noiseState.join(machineIdTemperature); noiseStateTemperature .foreachRDD(new FunctionJavaPairRDDString, Tuple2MachineState, Integer, Void() { @Override public Void call(JavaPairRDDString, Tuple2MachineState, Integer arg0) throws Exception { ListTuple2String, Tuple2MachineState, Integer list = arg0 .collect(); for (Tuple2String, Tuple2MachineState, Integer tuple : list) { String machineId String machineState db.insertAverages(machineId, machineState); } return null; } }); end code === Thank you. If there is no other workaround, I may use TD's approach because it is the only option. Best, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 1:54 PM, Tathagata Das tathagata.das1...@gmail.com wrote: And if Marcelo's guess is correct, then the right way to do this would be to lazily / dynamically create the jdbc connection server as a singleton in the workers/executors and use that. Something like this. dstream.foreachRDD(rdd = { rdd.foreachPartition((iterator: Iterator[...]) = { val driver = JDBCDriver.getSingleton() // this will create the single jdbc server in the worker, if it does not exist // loop through iterator to get the records in the partition and use the driver to push them out to the DB } } This will avoid the JDBC server being serialized as part of the closure / DStream checkpoint. TD On Thu, Jul 17, 2014 at 1:42 PM, Marcelo Vanzin van...@cloudera.com wrote: Could you share some code (or pseudo-code)? Sounds like you're instantiating the JDBC connection in the driver, and using it inside a closure that would be run in a remote executor. That means that the connection object would need to be serializable. If that sounds like what you're doing, it won't work. On Thu, Jul 17, 2014 at 1:37 PM, Yan Fang yanfang...@gmail.com wrote: Hi guys, need some help in this problem. In our use case, we need to continuously insert values into the database. So our approach is to create the jdbc object in the main method and then do the inserting operation in the DStream foreachRDD operation. Is this approach reasonable? Then the problem comes: since we are using com.mysql.jdbc.java, which is unserializable, we keep seeing the notSerializableException. I think that is because Spark Streaming is trying to serialize and then checkpoint the whole class which contains the StreamingContext, not only the StreamingContext object, right? Or other reason to trigger the serialize operation? Any workaround for this? (except not using the com.mysql.jdbc.java) Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 -- Marcelo
Re: Apache kafka + spark + Parquet
We don't have support for partitioned parquet yet. There is a JIRA here: https://issues.apache.org/jira/browse/SPARK-2406 On Thu, Jul 17, 2014 at 5:00 PM, Tathagata Das tathagata.das1...@gmail.com wrote: val kafkaStream = KafkaUtils.createStream(... ) // see the example in my previous post val transformedStream = kafkaStream.map ... // whatever transformation you want to do transformedStream.foreachRDD((rdd: RDD[...], time: Time) = { // save the rdd to parquet file, using time as the file name, see other link i sent in how to do it // every batch of data will create a new parquet file }) Maybe michael (cc'ed) will be able to give more insights about the parquet stuff. TD On Thu, Jul 17, 2014 at 3:59 AM, Mahebub Sayyed mahebub...@gmail.com wrote: Hi, To migrate data from *HBase *to *Parquet* we used following query through * Impala*: INSERT INTO table PARQUET_HASHTAGS( key, city_name, country_name, hashtag_date, hashtag_text, hashtag_source, hashtag_month, posted_time, hashtag_time, tweet_id, user_id, user_name, hashtag_year ) *partition(year, month, day)* SELECT key, city_name, country_name, hashtag_date, hashtag_text, hashtag_source, hashtag_month, posted_time, hashtag_time, tweet_id, user_id, user_name,hashtag_year, cast(hashtag_year as int),cast(hashtag_month as int), cast(hashtag_date as int) from HASHTAGS where hashtag_year='2014' and hashtag_month='04' and hashtag_date='01' ORDER BY key,hashtag_year,hashtag_month,hashtag_date LIMIT 1000 offset 0; using above query we have successfully migrated form HBase to Parquet files with proper partitions. Now we are storing Data direct from *Kafka *to *Parquet.* *How is it possible to create partitions while storing data direct from kafka to Parquet files??* *(likewise created in above query)* On Thu, Jul 17, 2014 at 12:35 PM, Tathagata Das tathagata.das1...@gmail.com wrote: 1. You can put in multiple kafka topics in the same Kafka input stream. See the example KafkaWordCount https://github.com/apache/spark/blob/68f28dabe9c7679be82e684385be216319beb610/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala . However they will all be read through a single receiver (though multiple threads, one per topic). To parallelize the read (for increasing throughput), you can create multiple Kafka input streams, and splits the topics appropriately between them. 2. You can easily read and write to parquet files in Spark. Any RDD (generated through DStreams in Spark Streaming, or otherwise), can be converted to a SchemaRDD and then saved in the parquet format as rdd.saveAsParquetFile. See the Spark SQL guide http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files for more details. So if you want to write a same dataset (as RDDs) to two different parquet files, you just have to call saveAsParquetFile twice (on same or transformed versions of the RDD), as shown in the guide. Hope this helps! TD On Thu, Jul 17, 2014 at 2:19 AM, Mahebub Sayyed mahebub...@gmail.com wrote: Hi All, Currently we are reading (multiple) topics from Apache kafka and storing that in HBase (multiple tables) using twitter storm (1 tuple stores in 4 different tables). but we are facing some performance issue with HBase. so we are replacing* HBase* with *Parquet* file and *storm* with *Apache Spark*. difficulties: 1. How to read multiple topics from kafka using spark? 2. One tuple belongs to multiple tables, How to write one topic to multiple parquet files with proper partitioning using spark?? Please help me Thanks in advance. -- *Regards,* *Mahebub * -- *Regards,* *Mahebub Sayyed*
Re: Retrieve dataset of Big Data Benchmark
Hi Burak, I tried running it through the Spark shell, but I still ended with the same error message as in Hadoop: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively). I guess the files are publicly available, but only to registered AWS users, so I caved in and registered for the service. Using the credentials that I got I was able to download the files using the local spark shell. Thanks! Tom -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Retrieve-dataset-of-Big-Data-Benchmark-tp9821p10096.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Error with spark-submit
Hi,I am new to Spark and trying out with a stand-alone, 3-node (1 master, 2 workers) cluster. From the Web UI at the master, I see that the workers are registered. But when I try running the SparkPi example from the master node, I get the following message and then an exception.14/07/17 01:20:36 INFO AppClient$ClientActor: Connecting to master spark://10.1.3.7:7077...14/07/17 01:20:46 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memoryI searched a bit for the above warning, and found and found that others have encountered this problem before, but did not see a clear resolution except for this link: http://apache-spark-user-list.1001560.n3.nabble.com/TaskSchedulerImpl-Initial-job-has-not-accepted-any-resources-check-your-cluster-UI-to-ensure-that-woy-tt8247.html#a8444 Based on the suggestion there I tried supplying --executor-memory option to spark-submit but that did not help.Any suggestions. Here are the details of my set up. - 3 nodes (each with 4 CPU cores and 7 GB memory) - 1 node configured as Master, and the other two configured as workers - Firewall is disabled on all nodes, and network communication between the nodes is not a problem - Edited the conf/spark-env.sh on all nodes to set the following: SPARK_WORKER_CORES=3SPARK_WORKER_MEMORY=5G - The Web UI as well as logs on master show that Workers were able to register correctly. Also the Web UI correctly shows the aggregate available memory and CPU cores on the workers:URL: spark://vmsparkwin1:7077Workers: 2Cores: 6 Total, 0 UsedMemory: 10.0 GB Total, 0.0 B UsedApplications: 0 Running, 0 CompletedDrivers: 0 Running, 0 CompletedStatus: ALIVEI try running the SparkPi example first using the run-example (which was failing) and later directly using the spark-submit as shown below:azureuser@vmsparkwin1 /cygdrive/c/opt/spark-1.0.0$ export MASTER=spark://vmsparkwin1:7077$ echo $MASTERspark://vmsparkwin1:7077azureuser@vmsparkwin1 /cygdrive/c/opt/spark-1.0.0$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://10.1.3.7:7077 --executor-memory 1G --total-executor-cores 2 ./lib/spark-examples-1.0.0-hadoop2.2.0.jar 10The following is the full screen output:14/07/17 01:20:13 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties14/07/17 01:20:13 INFO SecurityManager: Changing view acls to: azureuser14/07/17 01:20:13 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(azureuser)14/07/17 01:20:14 INFO Slf4jLogger: Slf4jLogger started14/07/17 01:20:14 INFO Remoting: Starting remoting14/07/17 01:20:14 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49839]14/07/17 01:20:14 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49839]14/07/17 01:20:14 INFO SparkEnv: Registering MapOutputTracker14/07/17 01:20:14 INFO SparkEnv: Registering BlockManagerMaster14/07/17 01:20:14 INFO DiskBlockManager: Created local directory at C:\cygwin\tmp\spark-local-20140717012014-b60614/07/17 01:20:14 INFO MemoryStore: MemoryStore started with capacity 294.9 MB.14/07/17 01:20:14 INFO ConnectionManager: Bound socket to port 49842 with id = ConnectionManagerId(vmsparkwin1.cssparkwin.b1.internal.cloudapp.net,49842)14/07/17 01:20:14 INFO BlockManagerMaster: Trying to register BlockManager14/07/17 01:20:14 INFO BlockManagerInfo: Registering block manager vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49842 with 294.9 MB RAM14/07/17 01:20:14 INFO BlockManagerMaster: Registered BlockManager14/07/17 01:20:14 INFO HttpServer: Starting HTTP Server14/07/17 01:20:14 INFO HttpBroadcast: Broadcast server started at http://10.1.3.7:4984314/07/17 01:20:14 INFO HttpFileServer: HTTP File server directory is C:\cygwin\tmp\spark-6a076e92-53bb-4c7a-9e27-ce53a818146d14/07/17 01:20:14 INFO HttpServer: Starting HTTP Server14/07/17 01:20:15 INFO SparkUI: Started SparkUI at http://vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:404014/07/17 01:20:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable14/07/17 01:20:16 INFO SparkContext: Added JAR file:/C:/opt/spark-1.0.0/./lib/spark-examples-1.0.0-hadoop2.2.0.jar at http://10.1.3.7:49844/jars/spark-examples-1.0.0-hadoop2.2.0.jar with timestamp 140556001631614/07/17 01:20:16 INFO AppClient$ClientActor: Connecting to master spark://10.1.3.7:7077...14/07/17 01:20:16 INFO SparkContext: Starting job: reduce at SparkPi.scala:3514/07/17 01:20:16 INFO DAGScheduler: Got job 0 (reduce at SparkPi.scala:35) with 10 output partitions (allowLocal=false)14/07/17 01:20:16 INFO DAGScheduler: Final stage: Stage 0(reduce at SparkPi.scala:35)14/07/17 01:20:16 INFO DAGScheduler: Parents of final stage: List()14/07/17 01:20:16 INFO
Large scale ranked recommendation
Hi, I am trying to develop a recommender system for about 1 million users and 10 thousand items. Currently it's a simple regression based model where for every user, item pair in dataset we generate some features and learn model from it. Till training and evaluation everything is fine the bottleneck is prediction and ranking for deployment, as at the end of day we need to recommend each user top 10 personalized items. To do this for every user I need to use model to predict his rating/preference on all items and take top 10 items from list. Hence after learning the model I need to do 10K X 1million predictions (model.predict(featureVector)). Currently I have the following process, feature vectors are sparse and of length ~300 each. *1. userFeatures:RDD[(Int, Vector)] , itemFeatures:RDD[(Int, Vector)]* I do cartesian product of above to generate every user, item combination and corresponding feature: *2. val allUIFeat:RDD[(Int, Int, Vector)] = userFeatures.cartesian(itemFeatures).map(...)* Then I use the model to do prediction as follow: *3. val allUIPred:RDD[(Int, Int, Double)] = allUIFeat.map{x = (x._1, x._2, model.predict(x._3))}* *4. Then we do group by user and sort to get top 10 items.* We are not able to complete step 3 above, its taking a really long time (~5hrs) to get all the predictions which is really long considering we already have the model and it just needs to do some computation for prediction. I have tried partitioning userFeatures across 800 partitions before doing above steps, still it was of no help. I am using about 100 executor , 2 core, each executor with 2gb RAM. Are there any suggestions to make these predictions fast? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-ranked-recommendation-tp10098.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Release date for new pyspark
Thanks all! (And thanks Matei for the developer link!) I was able to build using maven[1] but `./sbt/sbt assembly` results in build errors. (Not familiar enough with the build to know why; in the past sbt worked for me and maven did not). I was able to run the master version of pyspark, which was what I wanted, though I discovered a bug when trying to read spark-pickled data from HDFS. (Looks similar to https://spark-project.atlassian.net/browse/SPARK-1034 from my naive point of view). For the curious: Code: conf = SparkConf() conf.set('spark.local.dir', '/nail/tmp') conf.set('spark.executor.memory', '28g') conf.set('spark.app.name', 'test') sc = SparkContext(conf=conf) sc.parallelize(range(10)).saveAsPickleFile(hdfs://host:9000/test_pickle) unpickled_rdd = sc.pickleFile(hdfs://host:9000/test_pickle) print unpickled_rdd.takeSample(False, 3) Traceback (most recent call last): File /path/to/my/home/spark-master/tast.py, line 33, in module print unpickled_rdd.takeSample(False, 3) File /path/to/my/home/spark-master/python/pyspark/rdd.py, line 391, in takeSample initialCount = self.count() File /path/to/my/home/spark-master/python/pyspark/rdd.py, line 791, in count return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() File /path/to/my/home/spark-master/python/pyspark/rdd.py, line 782, in sum return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) File /path/to/my/home/spark-master/python/pyspark/rdd.py, line 703, in reduce vals = self.mapPartitions(func).collect() File /path/to/my/home/spark-master/python/pyspark/rdd.py, line 667, in collect bytesInJava = self._jrdd.collect().iterator() File /path/to/my/home/spark-master/python/pyspark/rdd.py, line 1600, in _jrdd class_tag) File /path/to/my/home/spark-master/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 669, in __call__ File /path/to/my/home/spark-master/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line 304, in get_return_value py4j.protocol.Py4JError: An error occurred while calling None.org.apache.spark.api.python.PythonRDD. Trace: py4j.Py4JException: Constructor org.apache.spark.api.python.PythonRDD([class org.apache.spark.rdd.FlatMappedRDD, class [B, class java.util.HashMap, class java.util.ArrayList, class java.lang.Boolean, class java.lang.String, class java.util.ArrayList, class org.apache.spark.Accumulator, class scala.reflect.ManifestFactory$$anon$2]) does not exist at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:184) at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:202) at py4j.Gateway.invoke(Gateway.java:213) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:662) [1] mvn -Phadoop-2.3 -Dhadoop.verson=2.3.0 -DskipTests clean package On Wed, Jul 16, 2014 at 8:39 PM, Michael Armbrust mich...@databricks.com wrote: You should try cleaning and then building. We have recently hit a bug in the scala compiler that sometimes causes non-clean builds to fail. On Wed, Jul 16, 2014 at 7:56 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Yeah, we try to have a regular 3 month release cycle; see https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage for the current window. Matei On Jul 16, 2014, at 4:21 PM, Mark Hamstra m...@clearstorydata.com wrote: You should expect master to compile and run: patches aren't merged unless they build and pass tests on Jenkins. You shouldn't expect new features to be added to stable code in maintenance releases (e.g. 1.0.1). AFAIK, we're still on track with Spark 1.1.0 development, which means that it should be released sometime in the second half of next month (or shortly thereafter). On Wed, Jul 16, 2014 at 4:03 PM, Paul Wais pw...@yelp.com wrote: Dear List, The version of pyspark on master has a lot of nice new features, e.g. SequenceFile reading, pickle i/o, etc: https://github.com/apache/spark/blob/master/python/pyspark/context.py#L353 I downloaded the recent 1.0.1 release and was surprised to see the distribution did not include these changes in master. (I've tried pulling master [ 9c249743ea ] and compiling from source, but I get a build failure in TestSQLContext.scala FWIW). Is an updated pyspark scheduled for the next release? (Also, am I wrong in expecting HEAD on master should probably compile and run?) Best Regards, -Paul Wais
Error with spark-submit (formatting corrected)
Hi, I am new to Spark and trying out with a stand-alone, 3-node (1 master, 2 workers) cluster. From the Web UI at the master, I see that the workers are registered. But when I try running the SparkPi example from the master node, I get the following message and then an exception. 14/07/17 01:20:36 INFO AppClient$ClientActor: Connecting to master spark://10.1.3.7:7077... 14/07/17 01:20:46 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory I searched a bit for the above warning, and found and found that others have encountered this problem before, but did not see a clear resolution except for this link: http://apache-spark-user-list.1001560.n3.nabble.com/TaskSchedulerImpl-Initial-job-has-not-accepted-any-resources-check-your-cluster-UI-to-ensure-that-woy-tt8247.html#a8444 Based on the suggestion there I tried supplying --executor-memory option to spark-submit but that did not help. Any suggestions. Here are the details of my set up. - 3 nodes (each with 4 CPU cores and 7 GB memory) - 1 node configured as Master, and the other two configured as workers - Firewall is disabled on all nodes, and network communication between the nodes is not a problem - Edited the conf/spark-env.sh on all nodes to set the following: SPARK_WORKER_CORES=3 SPARK_WORKER_MEMORY=5G - The Web UI as well as logs on master show that Workers were able to register correctly. Also the Web UI correctly shows the aggregate available memory and CPU cores on the workers: URL: spark://vmsparkwin1:7077 Workers: 2 Cores: 6 Total, 0 Used Memory: 10.0 GB Total, 0.0 B Used Applications: 0 Running, 0 Completed Drivers: 0 Running, 0 Completed Status: ALIVE I try running the SparkPi example first using the run-example (which was failing) and later directly using the spark-submit as shown below: $ export MASTER=spark://vmsparkwin1:7077 $ echo $MASTER spark://vmsparkwin1:7077 azureuser@vmsparkwin1 /cygdrive/c/opt/spark-1.0.0 $ ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://10.1.3.7:7077 --executor-memory 1G --total-executor-cores 2 ./lib/spark-examples-1.0.0-hadoop2.2.0.jar 10 The following is the full screen output: 14/07/17 01:20:13 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/07/17 01:20:13 INFO SecurityManager: Changing view acls to: azureuser 14/07/17 01:20:13 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(azureuser) 14/07/17 01:20:14 INFO Slf4jLogger: Slf4jLogger started 14/07/17 01:20:14 INFO Remoting: Starting remoting 14/07/17 01:20:14 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49839] 14/07/17 01:20:14 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49839] 14/07/17 01:20:14 INFO SparkEnv: Registering MapOutputTracker 14/07/17 01:20:14 INFO SparkEnv: Registering BlockManagerMaster 14/07/17 01:20:14 INFO DiskBlockManager: Created local directory at C:\cygwin\tmp\spark-local-20140717012014-b606 14/07/17 01:20:14 INFO MemoryStore: MemoryStore started with capacity 294.9 MB. 14/07/17 01:20:14 INFO ConnectionManager: Bound socket to port 49842 with id = ConnectionManagerId(vmsparkwin1.cssparkwin.b1.internal.cloudapp.net,49842) 14/07/17 01:20:14 INFO BlockManagerMaster: Trying to register BlockManager 14/07/17 01:20:14 INFO BlockManagerInfo: Registering block manager vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49842 with 294.9 MB RAM 14/07/17 01:20:14 INFO BlockManagerMaster: Registered BlockManager 14/07/17 01:20:14 INFO HttpServer: Starting HTTP Server 14/07/17 01:20:14 INFO HttpBroadcast: Broadcast server started at http://10.1.3.7:49843 14/07/17 01:20:14 INFO HttpFileServer: HTTP File server directory is C:\cygwin\tmp\spark-6a076e92-53bb-4c7a-9e27-ce53a818146d 14/07/17 01:20:14 INFO HttpServer: Starting HTTP Server 14/07/17 01:20:15 INFO SparkUI: Started SparkUI at http://vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:4040 14/07/17 01:20:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/17 01:20:16 INFO SparkContext: Added JAR file:/C:/opt/spark-1.0.0/./lib/spark-examples-1.0.0-hadoop2.2.0.jar at http://10.1.3.7:49844/jars/spark-examples-1.0.0-hadoop2.2.0.jar with timestamp 1405560016316 14/07/17 01:20:16 INFO AppClient$ClientActor: Connecting to master spark://10.1.3.7:7077... 14/07/17 01:20:16 INFO SparkContext: Starting job: reduce at SparkPi.scala:35 14/07/17 01:20:16 INFO DAGScheduler: Got job 0 (reduce at SparkPi.scala:35) with 10 output partitions (allowLocal=false) 14/07/17 01:20:16 INFO DAGScheduler: Final stage: Stage 0(reduce at SparkPi.scala:35) 14/07/17 01:20:16 INFO DAGScheduler: Parents of final stage: List()
Re: Large scale ranked recommendation
We are using RegressionModels that comes with *mllib* package in SPARK. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-ranked-recommendation-tp10098p10103.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Large scale ranked recommendation
Hi, Are you suggesting that taking simple vector dot products or sigmoid function on 10K * 1M data takes 5hrs? On Thu, Jul 17, 2014 at 3:59 PM, m3.sharma sharm...@umn.edu wrote: We are using RegressionModels that comes with *mllib* package in SPARK. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-ranked-recommendation-tp10098p10103.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark streaming rate limiting from kafka
I also have an issue consuming from Kafka. When I consume from Kafka, there are always a single executor working on this job. Even I use repartition, it seems that there is still a single executor. Does anyone has an idea how to add parallelism to this job? On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com wrote: Thanks Luis and Tobias. On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com wrote: * Is there a way to control how far Kafka Dstream can read on topic-partition (via offset for example). By setting this to a small number, it will force DStream to read less data initially. Please see the post at http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E Kafka's auto.offset.reset parameter may be what you are looking for. Tobias -- Chen Song
Re: Large scale ranked recommendation
Yes, thats what prediction should be doing, taking dot products or sigmoid function for each user,item pair. For 1 million users and 10 K items data there are 10 billion pairs. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-ranked-recommendation-tp10098p10107.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark Streaming
Hi, All When I run spark streaming, in one of the flatMap stage, I want to access database. Code looks like : stream.flatMap( new FlatMapFunction { call () { //access database cluster } } ) Since I don't want to create database connection every time call() was called, where is the best place do I create the connection and reuse it on per-host basis (Like one database connection per Mapper/Reducer ) ? Regards, Guangle
Re: unserializable object in Spark Streaming context
Hi Sean, Thank you. I see your point. What I was thinking is that, do computation in a distributed fashion and do the storing from a single place. But you are right, having multiple DB connections actually is fine. Thanks for answering my questions. That helps me understand the system. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 2:53 PM, Sean Owen so...@cloudera.com wrote: On Thu, Jul 17, 2014 at 10:39 PM, Yan Fang yanfang...@gmail.com wrote: Thank you for the help. If I use TD's approache, it works and there is no exception. Only drawback is that it will create many connections to the DB, which I was trying to avoid. Connection-like objects aren't data that can be serialized. What would it mean to share one connection with N workers? that they all connect back to the driver, and through one DB connection there? this defeats the purpose of distributed computing. You want multiple DB connections. You can limit the number of partitions if needed. Here is a snapshot of my code. Mark as red for the important code. What I was thinking is that, if I call the collect() method, Spark Streaming will bring the data to the driver and then the db object does not need to be sent The Function you pass to foreachRDD() has a reference to db though. That's what is making it be serialized. to executors. My observation is that, thought exceptions are thrown, the insert function still works. Any thought about that? Also paste the log in case it helps .http://pastebin.com/T1bYvLWB Any executors that run locally might skip the serialization and succeed (?) but I don't think the remote executors can be succeeding.
Hive From Spark
Hello Spark Users, I am new to Spark SQL and now trying to first get the HiveFromSpark example working. However, I got the following error when running HiveFromSpark.scala program. May I get some help on this please? ERROR MESSAGE: org.apache.thrift.TApplicationException: Invalid method name: 'get_table' at org.apache.thrift.TApplicationException.read(TApplicationException.java:108) at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:71) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_table(ThriftHiveMetastore.java:936) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_table(ThriftHiveMetastore.java:922) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:854) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89) at $Proxy9.getTable(Unknown Source) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:905) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:8999) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:8313) at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:284) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:441) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:342) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:977) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:888) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:186) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:160) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:250) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:247) at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:85) at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:90) at HiveFromSpark$.main(HiveFromSpark.scala:38) at HiveFromSpark.main(HiveFromSpark.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Thank you very much! JJing -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Hive-From-Spark-tp10110.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: replacement for SPARK_LIBRARY_PATH ?
but be aware that spark-defaults.conf is only used if you use spark-submit On Jul 17, 2014 4:29 PM, Zongheng Yang zonghen...@gmail.com wrote: One way is to set this in your conf/spark-defaults.conf: spark.executor.extraLibraryPath /path/to/native/lib The key is documented here: http://spark.apache.org/docs/latest/configuration.html On Thu, Jul 17, 2014 at 1:25 PM, Eric Friedman eric.d.fried...@gmail.com wrote: I used to use SPARK_LIBRARY_PATH to specify the location of native libs for lzo compression when using spark 0.9.0. The references to that environment variable have disappeared from the docs for spark 1.0.1 and it's not clear how to specify the location for lzo. Any guidance?
Re: Is there a way to get previous/other keys' state in Spark Streaming?
Hi TD, Thank you. Yes, it behaves as you described. Sorry for missing this point. Then my only concern is in the performance side - since Spark Streaming operates on all the keys everytime a new batch comes, I think it is fine when the state size is small. When the state size becomes big, say, a few GBs, if we still go through the whole key list, would the operation be a little inefficient then? Maybe I miss some points in Spark Streaming, which consider this situation. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 1:47 PM, Tathagata Das tathagata.das1...@gmail.com wrote: The updateFunction given in updateStateByKey should be called on ALL the keys are in the state, even if there is no new data in the batch for some key. Is that not the behavior you see? What do you mean by show all the existing states? You have access to the latest state RDD by doing stateStream.foreachRDD(...). There you can do whatever operation on all the key-state pairs. TD On Thu, Jul 17, 2014 at 11:58 AM, Yan Fang yanfang...@gmail.com wrote: Hi TD, Thank you for the quick replying and backing my approach. :) 1) The example is this: 1. In the first 2 second interval, after updateStateByKey, I get a few keys and their states, say, (a - 1, b - 2, c - 3) 2. In the following 2 second interval, I only receive c and d and their value. But I want to update/display the state of a and b accordingly. * It seems I have no way to access the a and b and get their states. * also, do I have a way to show all the existing states? I guess the approach to solve this will be similar to what you mentioned for 2). But the difficulty is that, if I want to display all the existing states, need to bundle all the rest keys to one key. Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 11:36 AM, Tathagata Das tathagata.das1...@gmail.com wrote: For accessing previous version, I would do it the same way. :) 1. Can you elaborate on what you mean by that with an example? What do you mean by accessing keys? 2. Yeah, that is hard to do with the ability to do point lookups into an RDD, which we dont support yet. You could try embedding the related key in the values of the keys that need it. That is, B will is present in the value of key A. Then put this transformed DStream through updateStateByKey. TD
Re: can't get jobs to run on cluster (enough memory and cpus are available on worker)
Hi Matt, The security group shouldn't be an issue; the ports listed in `spark_ec2.py` are only for communication with the outside world. How did you launch your application? I notice you did not launch your driver from your Master node. What happens if you did? Another thing is that there seems to be some inconsistency or missing pieces in the logs you posted. After an executor says driver disassociated, what happens in the driver logs? Is an exception thrown or something? It would be useful if you could also post your conf/spark-env.sh. Andrew 2014-07-17 14:11 GMT-07:00 Marcelo Vanzin van...@cloudera.com: Hi Matt, I'm not very familiar with setup on ec2; the closest I can point you at is to look at the launch_cluster in ec2/spark_ec2.py, where the ports seem to be configured. On Thu, Jul 17, 2014 at 1:29 PM, Matt Work Coarr mattcoarr.w...@gmail.com wrote: Thanks Marcelo! This is a huge help!! Looking at the executor logs (in a vanilla spark install, I'm finding them in $SPARK_HOME/work/*)... It launches the executor, but it looks like the CoarseGrainedExecutorBackend is having trouble talking to the driver (exactly what you said!!!). Do you know what the range of random ports that is used for the the executor-to-driver? Is that range adjustable? Any config setting or environment variable? I manually setup my ec2 security group to include all the ports that the spark ec2 script ($SPARK_HOME/ec2/spark_ec2.py) sets up in it's security groups. They included (for those listed above 1): 1 50060 50070 50075 60060 60070 60075 Obviously I'll need to make some adjustments to my EC2 security group! Just need to figure out exactly what should be in there. To keep things simple, I just have one security group for the master, slaves, and the driver machine. In listing the port ranges in my current security group I looked at the ports that spark_ec2.py sets up as well as the ports listed in the spark standalone mode documentation page under configuring ports for network security: http://spark.apache.org/docs/latest/spark-standalone.html Here are the relevant fragments from the executor log: Spark Executor Command: /cask/jdk/bin/java -cp ::/cask/spark/conf:/cask/spark/lib/spark-assembly-1.0.0-hadoop2.2.0.jar:/cask/spark/lib/datanucleus-api-jdo-3. 2.1.jar:/cask/spark/lib/datanucleus-rdbms-3.2.1.jar:/cask/spark/lib/datanucleus-core-3.2.2.jar -XX:MaxPermSize=128m -Dspark.akka.frameSize=100 -Dspark.akka. frameSize=100 -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@ip-10-202-11-191.ec2.internal:46787/user/CoarseGra inedScheduler 0 ip-10-202-8-45.ec2.internal 8 akka.tcp://sparkWorker@ip-10-202-8-45.ec2.internal:7101/user/Worker app-20140717195146- ... 14/07/17 19:51:47 DEBUG NativeCodeLoader: Trying to load the custom-built native-hadoop library... 14/07/17 19:51:47 DEBUG NativeCodeLoader: Failed to load native-hadoop with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path 14/07/17 19:51:47 DEBUG NativeCodeLoader: java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib 14/07/17 19:51:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/17 19:51:47 DEBUG JniBasedUnixGroupsMappingWithFallback: Falling back to shell based 14/07/17 19:51:47 DEBUG JniBasedUnixGroupsMappingWithFallback: Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping 14/07/17 19:51:48 DEBUG Groups: Group mapping impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback; cacheTimeout=30 14/07/17 19:51:48 DEBUG SparkHadoopUtil: running as user: ec2-user ... 14/07/17 19:51:48 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@ip-10-202-11-191.ec2.internal :46787/user/CoarseGrainedScheduler 14/07/17 19:51:48 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@ip-10-202-8-45.ec2.internal:7101/user/Worker 14/07/17 19:51:49 INFO WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@ip-10-202-8-45.ec2.internal:7101/user/Worker 14/07/17 19:53:29 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@ip-10-202-8-45.ec2.internal:55670] - [akka.tcp://spark@ip-10-202-11-191.ec2.internal:46787] disassociated! Shutting down. Thanks a bunch! Matt On Thu, Jul 17, 2014 at 1:21 PM, Marcelo Vanzin van...@cloudera.com wrote: When I meant the executor log, I meant the log of the process launched by the worker, not the worker. In my CDH-based Spark install, those end up in /var/run/spark/work. If you look at your worker log, you'll see it's launching the executor process. So there should be something there.
Re: Include permalinks in mail footer
On Jul 17, 2014, at 12:59 PM, Nick Chammas nicholas.cham...@gmail.com wrote: I often find myself wanting to reference one thread from another, or from a JIRA issue. Right now I have to google the thread subject and find the link that way. +1
Re: spark streaming rate limiting from kafka
Bill, are you saying, after repartition(400), you have 400 partitions on one host and the other hosts receive nothing of the data? Tobias On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com wrote: I also have an issue consuming from Kafka. When I consume from Kafka, there are always a single executor working on this job. Even I use repartition, it seems that there is still a single executor. Does anyone has an idea how to add parallelism to this job? On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com wrote: Thanks Luis and Tobias. On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com wrote: * Is there a way to control how far Kafka Dstream can read on topic-partition (via offset for example). By setting this to a small number, it will force DStream to read less data initially. Please see the post at http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E Kafka's auto.offset.reset parameter may be what you are looking for. Tobias -- Chen Song
Re: how to pass extra Java opts to workers for spark streaming jobs
Can you check in the environment tab of Spark web ui to see whether this configuration parameter is in effect? TD On Thu, Jul 17, 2014 at 2:05 PM, Chen Song chen.song...@gmail.com wrote: I am using spark 0.9.0 and I am able to submit job to YARN, https://spark.apache.org/docs/0.9.0/running-on-yarn.html. I am trying to turn on gc logging on executors but could not find a way to set extra Java opts for workers. I tried to set spark.executor.extraJavaOptions but that did not work. Any idea on how I should do this? -- Chen Song
Re: spark streaming rate limiting from kafka
You can create multiple kafka stream to partition your topics across them, which will run multiple receivers or multiple executors. This is covered in the Spark streaming guide. http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving And for the purpose of this thread, to answer the original question, we now have the ability https://issues.apache.org/jira/browse/SPARK-1854?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20Streaming%20ORDER%20BY%20priority%20DESC to limit the receiving rate. Its in the master branch, and will be available in Spark 1.1. It basically sets the limits at the receiver level (so applies to all sources) on what is the max records per second that can will be received by the receiver. TD On Thu, Jul 17, 2014 at 6:15 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, are you saying, after repartition(400), you have 400 partitions on one host and the other hosts receive nothing of the data? Tobias On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com wrote: I also have an issue consuming from Kafka. When I consume from Kafka, there are always a single executor working on this job. Even I use repartition, it seems that there is still a single executor. Does anyone has an idea how to add parallelism to this job? On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com wrote: Thanks Luis and Tobias. On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com wrote: * Is there a way to control how far Kafka Dstream can read on topic-partition (via offset for example). By setting this to a small number, it will force DStream to read less data initially. Please see the post at http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E Kafka's auto.offset.reset parameter may be what you are looking for. Tobias -- Chen Song
Re: Error with spark-submit (formatting corrected)
Hi ranjanp, If you go to the master UI (masterIP:8080), what does the first line say? Verify that this is the same as what you expect. Another thing is that --master in spark submit overwrites whatever you set MASTER to, so the environment variable won't actually take effect. Another obvious thing to check is whether the node from which you launch spark submit can access the internal address of the master (and port 7077). One quick way to verify that is to attempt a telnet into it. Let me know if you find anything. Andrew 2014-07-17 15:57 GMT-07:00 ranjanp piyush_ran...@hotmail.com: Hi, I am new to Spark and trying out with a stand-alone, 3-node (1 master, 2 workers) cluster. From the Web UI at the master, I see that the workers are registered. But when I try running the SparkPi example from the master node, I get the following message and then an exception. 14/07/17 01:20:36 INFO AppClient$ClientActor: Connecting to master spark://10.1.3.7:7077... 14/07/17 01:20:46 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory I searched a bit for the above warning, and found and found that others have encountered this problem before, but did not see a clear resolution except for this link: http://apache-spark-user-list.1001560.n3.nabble.com/TaskSchedulerImpl-Initial-job-has-not-accepted-any-resources-check-your-cluster-UI-to-ensure-that-woy-tt8247.html#a8444 Based on the suggestion there I tried supplying --executor-memory option to spark-submit but that did not help. Any suggestions. Here are the details of my set up. - 3 nodes (each with 4 CPU cores and 7 GB memory) - 1 node configured as Master, and the other two configured as workers - Firewall is disabled on all nodes, and network communication between the nodes is not a problem - Edited the conf/spark-env.sh on all nodes to set the following: SPARK_WORKER_CORES=3 SPARK_WORKER_MEMORY=5G - The Web UI as well as logs on master show that Workers were able to register correctly. Also the Web UI correctly shows the aggregate available memory and CPU cores on the workers: URL: spark://vmsparkwin1:7077 Workers: 2 Cores: 6 Total, 0 Used Memory: 10.0 GB Total, 0.0 B Used Applications: 0 Running, 0 Completed Drivers: 0 Running, 0 Completed Status: ALIVE I try running the SparkPi example first using the run-example (which was failing) and later directly using the spark-submit as shown below: $ export MASTER=spark://vmsparkwin1:7077 $ echo $MASTER spark://vmsparkwin1:7077 azureuser@vmsparkwin1 /cygdrive/c/opt/spark-1.0.0 $ ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://10.1.3.7:7077 --executor-memory 1G --total-executor-cores 2 ./lib/spark-examples-1.0.0-hadoop2.2.0.jar 10 The following is the full screen output: 14/07/17 01:20:13 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/07/17 01:20:13 INFO SecurityManager: Changing view acls to: azureuser 14/07/17 01:20:13 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(azureuser) 14/07/17 01:20:14 INFO Slf4jLogger: Slf4jLogger started 14/07/17 01:20:14 INFO Remoting: Starting remoting 14/07/17 01:20:14 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49839] 14/07/17 01:20:14 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49839] 14/07/17 01:20:14 INFO SparkEnv: Registering MapOutputTracker 14/07/17 01:20:14 INFO SparkEnv: Registering BlockManagerMaster 14/07/17 01:20:14 INFO DiskBlockManager: Created local directory at C:\cygwin\tmp\spark-local-20140717012014-b606 14/07/17 01:20:14 INFO MemoryStore: MemoryStore started with capacity 294.9 MB. 14/07/17 01:20:14 INFO ConnectionManager: Bound socket to port 49842 with id = ConnectionManagerId(vmsparkwin1.cssparkwin.b1.internal.cloudapp.net ,49842) 14/07/17 01:20:14 INFO BlockManagerMaster: Trying to register BlockManager 14/07/17 01:20:14 INFO BlockManagerInfo: Registering block manager vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49842 with 294.9 MB RAM 14/07/17 01:20:14 INFO BlockManagerMaster: Registered BlockManager 14/07/17 01:20:14 INFO HttpServer: Starting HTTP Server 14/07/17 01:20:14 INFO HttpBroadcast: Broadcast server started at http://10.1.3.7:49843 14/07/17 01:20:14 INFO HttpFileServer: HTTP File server directory is C:\cygwin\tmp\spark-6a076e92-53bb-4c7a-9e27-ce53a818146d 14/07/17 01:20:14 INFO HttpServer: Starting HTTP Server 14/07/17 01:20:15 INFO SparkUI: Started SparkUI at http://vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:4040 14/07/17 01:20:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where
Re: how to pass extra Java opts to workers for spark streaming jobs
Hi Chen, spark.executor.extraJavaOptions is introduced in Spark 1.0, not in Spark 0.9. You need to export SPARK_JAVA_OPTS= -Dspark.config1=value1 -Dspark.config2=value2 in conf/spark-env.sh. Let me know if that works. Andrew 2014-07-17 18:15 GMT-07:00 Tathagata Das tathagata.das1...@gmail.com: Can you check in the environment tab of Spark web ui to see whether this configuration parameter is in effect? TD On Thu, Jul 17, 2014 at 2:05 PM, Chen Song chen.song...@gmail.com wrote: I am using spark 0.9.0 and I am able to submit job to YARN, https://spark.apache.org/docs/0.9.0/running-on-yarn.html. I am trying to turn on gc logging on executors but could not find a way to set extra Java opts for workers. I tried to set spark.executor.extraJavaOptions but that did not work. Any idea on how I should do this? -- Chen Song
Re: jar changed on src filesystem
Hi Jian, In yarn-cluster mode, Spark submit automatically uploads the assembly jar to a distributed cache that all executor containers read from, so there is no need to manually copy the assembly jar to all nodes (or pass it through --jars). It seems there are two versions of the same jar in your HDFS. Try removing all old jars from your .sparkStaging directory and try again? Let me know if that does the job, Andrew 2014-07-16 23:42 GMT-07:00 cmti95035 cmti95...@gmail.com: They're all the same version. Actually even without the --jars parameter it got the same error. Looks like it needs to copy the assembly jar for running the example jar anyway during the staging. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/jar-changed-on-src-filesystem-tp10011p10017.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming
Step 1: use rdd.mapPartitions(). Thats equivalent to mapper of the MapReduce. You can open connection, get all the data and buffer it, close connection, return iterator to the buffer Step 2: Make step 1 better, by making it reuse connections. You can use singletons / static vars, to lazily initialize and reuse a pool of connections. You will have to take care of concurrency, as multiple tasks may using the database in parallel in the same worker JVM. TD On Thu, Jul 17, 2014 at 4:41 PM, Guangle Fan fanguan...@gmail.com wrote: Hi, All When I run spark streaming, in one of the flatMap stage, I want to access database. Code looks like : stream.flatMap( new FlatMapFunction { call () { //access database cluster } } ) Since I don't want to create database connection every time call() was called, where is the best place do I create the connection and reuse it on per-host basis (Like one database connection per Mapper/Reducer ) ? Regards, Guangle
Re: Errors accessing hdfs while in local mode
Hi Chris, Did you ever figure this out? It should just work provided that your HDFS is set up correctly. If you don't call setMaster, it actually uses the spark://[master-node-ip]:7077 by default (this is configured in your conf/spark-env.sh). However, even if you use a local master, it should still work (I just tried this on my own EC2 cluster). By the way, SPARK_MASTER is actually deprecated. Instead, please use bin/spark-submit --master [your master]. Andrew 2014-07-16 23:46 GMT-07:00 Akhil Das ak...@sigmoidanalytics.com: You can try the following in the spark-shell: 1. Run it in *Clustermode* by going inside the spark directory: $ SPARK_MASTER=spark://masterip:7077 ./bin/spark-shell val textFile = sc.textFile(hdfs://masterip/data/blah.csv) textFile.take(10).foreach(println) 2. Now try running in *Localmode:* $ SPARK_MASTER=local ./bin/spark-shell val textFile = sc.textFile(hdfs://masterip/data/blah.csv) textFile.take(10).foreach(println) Both should print the first 10 lines from your blah.csv file.
Re: Is there a way to get previous/other keys' state in Spark Streaming?
Yes, this is the limitation of the current implementation. But this will be improved a lt when we have IndexedRDD https://github.com/apache/spark/pull/1297 in the Spark that allows faster single value updates to a key-value (within each partition, without processing the entire partition. Soon. TD On Thu, Jul 17, 2014 at 5:57 PM, Yan Fang yanfang...@gmail.com wrote: Hi TD, Thank you. Yes, it behaves as you described. Sorry for missing this point. Then my only concern is in the performance side - since Spark Streaming operates on all the keys everytime a new batch comes, I think it is fine when the state size is small. When the state size becomes big, say, a few GBs, if we still go through the whole key list, would the operation be a little inefficient then? Maybe I miss some points in Spark Streaming, which consider this situation. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 1:47 PM, Tathagata Das tathagata.das1...@gmail.com wrote: The updateFunction given in updateStateByKey should be called on ALL the keys are in the state, even if there is no new data in the batch for some key. Is that not the behavior you see? What do you mean by show all the existing states? You have access to the latest state RDD by doing stateStream.foreachRDD(...). There you can do whatever operation on all the key-state pairs. TD On Thu, Jul 17, 2014 at 11:58 AM, Yan Fang yanfang...@gmail.com wrote: Hi TD, Thank you for the quick replying and backing my approach. :) 1) The example is this: 1. In the first 2 second interval, after updateStateByKey, I get a few keys and their states, say, (a - 1, b - 2, c - 3) 2. In the following 2 second interval, I only receive c and d and their value. But I want to update/display the state of a and b accordingly. * It seems I have no way to access the a and b and get their states. * also, do I have a way to show all the existing states? I guess the approach to solve this will be similar to what you mentioned for 2). But the difficulty is that, if I want to display all the existing states, need to bundle all the rest keys to one key. Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 11:36 AM, Tathagata Das tathagata.das1...@gmail.com wrote: For accessing previous version, I would do it the same way. :) 1. Can you elaborate on what you mean by that with an example? What do you mean by accessing keys? 2. Yeah, that is hard to do with the ability to do point lookups into an RDD, which we dont support yet. You could try embedding the related key in the values of the keys that need it. That is, B will is present in the value of key A. Then put this transformed DStream through updateStateByKey. TD
Re: Error with spark-submit (formatting corrected)
I think I know what is happening to you. I've looked some into this just this week, and so its fresh in my brain :) hope this helps. When no workers are known to the master, iirc, you get this message. I think this is how it works. 1) You start your master 2) You start a slave, and give it master url as an argument. 3) The slave then binds to a random port 4) The slave then does a handshake with master, which you can see in the slave logs (it sais something like sucesfully connected to master at …. Actualy, i think tha master also logs that it now is aware of a slave running on ip:port… So in your case, I suspect, none of the slaves have connected to the master, so the job sits idle. This is similar to the yarn scenario of submitting a job to a resource manager with no node-managers running. On Jul 17, 2014, at 6:57 PM, ranjanp piyush_ran...@hotmail.com wrote: Hi, I am new to Spark and trying out with a stand-alone, 3-node (1 master, 2 workers) cluster. From the Web UI at the master, I see that the workers are registered. But when I try running the SparkPi example from the master node, I get the following message and then an exception. 14/07/17 01:20:36 INFO AppClient$ClientActor: Connecting to master spark://10.1.3.7:7077... 14/07/17 01:20:46 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory I searched a bit for the above warning, and found and found that others have encountered this problem before, but did not see a clear resolution except for this link: http://apache-spark-user-list.1001560.n3.nabble.com/TaskSchedulerImpl-Initial-job-has-not-accepted-any-resources-check-your-cluster-UI-to-ensure-that-woy-tt8247.html#a8444 Based on the suggestion there I tried supplying --executor-memory option to spark-submit but that did not help. Any suggestions. Here are the details of my set up. - 3 nodes (each with 4 CPU cores and 7 GB memory) - 1 node configured as Master, and the other two configured as workers - Firewall is disabled on all nodes, and network communication between the nodes is not a problem - Edited the conf/spark-env.sh on all nodes to set the following: SPARK_WORKER_CORES=3 SPARK_WORKER_MEMORY=5G - The Web UI as well as logs on master show that Workers were able to register correctly. Also the Web UI correctly shows the aggregate available memory and CPU cores on the workers: URL: spark://vmsparkwin1:7077 Workers: 2 Cores: 6 Total, 0 Used Memory: 10.0 GB Total, 0.0 B Used Applications: 0 Running, 0 Completed Drivers: 0 Running, 0 Completed Status: ALIVE I try running the SparkPi example first using the run-example (which was failing) and later directly using the spark-submit as shown below: $ export MASTER=spark://vmsparkwin1:7077 $ echo $MASTER spark://vmsparkwin1:7077 azureuser@vmsparkwin1 /cygdrive/c/opt/spark-1.0.0 $ ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://10.1.3.7:7077 --executor-memory 1G --total-executor-cores 2 ./lib/spark-examples-1.0.0-hadoop2.2.0.jar 10 The following is the full screen output: 14/07/17 01:20:13 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/07/17 01:20:13 INFO SecurityManager: Changing view acls to: azureuser 14/07/17 01:20:13 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(azureuser) 14/07/17 01:20:14 INFO Slf4jLogger: Slf4jLogger started 14/07/17 01:20:14 INFO Remoting: Starting remoting 14/07/17 01:20:14 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49839] 14/07/17 01:20:14 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49839] 14/07/17 01:20:14 INFO SparkEnv: Registering MapOutputTracker 14/07/17 01:20:14 INFO SparkEnv: Registering BlockManagerMaster 14/07/17 01:20:14 INFO DiskBlockManager: Created local directory at C:\cygwin\tmp\spark-local-20140717012014-b606 14/07/17 01:20:14 INFO MemoryStore: MemoryStore started with capacity 294.9 MB. 14/07/17 01:20:14 INFO ConnectionManager: Bound socket to port 49842 with id = ConnectionManagerId(vmsparkwin1.cssparkwin.b1.internal.cloudapp.net,49842) 14/07/17 01:20:14 INFO BlockManagerMaster: Trying to register BlockManager 14/07/17 01:20:14 INFO BlockManagerInfo: Registering block manager vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49842 with 294.9 MB RAM 14/07/17 01:20:14 INFO BlockManagerMaster: Registered BlockManager 14/07/17 01:20:14 INFO HttpServer: Starting HTTP Server 14/07/17 01:20:14 INFO HttpBroadcast: Broadcast server started at http://10.1.3.7:49843 14/07/17 01:20:14 INFO HttpFileServer: HTTP File server directory is
iScala or Scala-notebook
Hey everyone, I know this was asked before but I'm wondering if there have since been any updates. Are there any plans to integrate iScala/Scala-notebook with spark in the near future? This seems like something a lot of people would find very useful, so I was just wondering if anyone has started working on it. Thanks, Eric -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/iScala-or-Scala-notebook-tp10127.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Cannot connect to hive metastore
Seems like the mysql connector jar is not included in the classpath. Where can I set the jar to the classpath? hive-site.xml: property namejavax.jdo.option.ConnectionURL/name valuejdbc:mysql://localhost:3306/metastore?createDatabaseIfNotExist=trueamp;characterEncoding=UTF-8/value descriptionJDBC connect string for a JDBC metastore/description /property Log: 14/07/18 11:46:58 ERROR DDLTask: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.metadata.Hive.getDatabase(Hive.java:1143) at org.apache.hadoop.hive.ql.metadata.Hive.databaseExists(Hive.java:1128) at org.apache.hadoop.hive.ql.exec.DDLTask.showTables(DDLTask.java:2236) at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:333) at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:151) at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:65) at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1414) at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1192) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1020) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:888) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:189) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:163) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:38) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:250) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:250) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:100) at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:75) at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:78) at $line9.$read$$iwC$$iwC$$iwC$$iwC.init(console:15) at $line9.$read$$iwC$$iwC$$iwC.init(console:20) at $line9.$read$$iwC$$iwC.init(console:22) at $line9.$read$$iwC.init(console:24) at $line9.$read.init(console:26) at $line9.$read$.init(console:30) at $line9.$read$.clinit(console) at $line9.$eval$.init(console:7) at $line9.$eval$.clinit(console) at $line9.$eval.$print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:982) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303)
Re: how to pass extra Java opts to workers for spark streaming jobs
Thanks Andrew. Say that I want to turn on CMS gc for each worker. All I need to do is add the following line to conf/spark-env.sh on node where I submit the application. -XX:+UseConcMarkSweepGC Is that correct? Will this option be populated to each worker in yarn? On Thu, Jul 17, 2014 at 9:26 PM, Andrew Or and...@databricks.com wrote: Hi Chen, spark.executor.extraJavaOptions is introduced in Spark 1.0, not in Spark 0.9. You need to export SPARK_JAVA_OPTS= -Dspark.config1=value1 -Dspark.config2=value2 in conf/spark-env.sh. Let me know if that works. Andrew 2014-07-17 18:15 GMT-07:00 Tathagata Das tathagata.das1...@gmail.com: Can you check in the environment tab of Spark web ui to see whether this configuration parameter is in effect? TD On Thu, Jul 17, 2014 at 2:05 PM, Chen Song chen.song...@gmail.com wrote: I am using spark 0.9.0 and I am able to submit job to YARN, https://spark.apache.org/docs/0.9.0/running-on-yarn.html. I am trying to turn on gc logging on executors but could not find a way to set extra Java opts for workers. I tried to set spark.executor.extraJavaOptions but that did not work. Any idea on how I should do this? -- Chen Song -- Chen Song
Re: Spark Streaming Json file groupby function
Hi TD, It Worked...Thank you so much for all your help. Thanks, -Srinivas. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p10132.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: how to pass extra Java opts to workers for spark streaming jobs
You will need to include that in the SPARK_JAVA_OPTS environment variable, so add the following line to spark-env.sh: export SPARK_JAVA_OPTS= -XX:+UseConcMarkSweepGC This should propagate to the executors. (Though you should double check, since 0.9 is a little old and I could be forgetting something) If you wish to add spark options in addition to this, simply append them to the environment variable: export SPARK_JAVA_OPTS= -XX:+UseConcMarkSweepGC -Dspark.config.one=value -Dspark.config.two=value (Please note that this is only for Spark 0.9. The part where we set Spark options within SPARK_JAVA_OPTS is deprecated as of 1.0) 2014-07-17 21:08 GMT-07:00 Chen Song chen.song...@gmail.com: Thanks Andrew. Say that I want to turn on CMS gc for each worker. All I need to do is add the following line to conf/spark-env.sh on node where I submit the application. -XX:+UseConcMarkSweepGC Is that correct? Will this option be populated to each worker in yarn? On Thu, Jul 17, 2014 at 9:26 PM, Andrew Or and...@databricks.com wrote: Hi Chen, spark.executor.extraJavaOptions is introduced in Spark 1.0, not in Spark 0.9. You need to export SPARK_JAVA_OPTS= -Dspark.config1=value1 -Dspark.config2=value2 in conf/spark-env.sh. Let me know if that works. Andrew 2014-07-17 18:15 GMT-07:00 Tathagata Das tathagata.das1...@gmail.com: Can you check in the environment tab of Spark web ui to see whether this configuration parameter is in effect? TD On Thu, Jul 17, 2014 at 2:05 PM, Chen Song chen.song...@gmail.com wrote: I am using spark 0.9.0 and I am able to submit job to YARN, https://spark.apache.org/docs/0.9.0/running-on-yarn.html. I am trying to turn on gc logging on executors but could not find a way to set extra Java opts for workers. I tried to set spark.executor.extraJavaOptions but that did not work. Any idea on how I should do this? -- Chen Song -- Chen Song
Last step of processing is using too much memory.
Hello, I have an issue where my spark code is using too much memory in the final step ( a count for testing purpose, it will write the result to a db when it works ). I'm really not too sure how I can break down the last step to use less RAM. So, basically my data is log lines and each log line has a session id. I want to group by session to reconstruct the events of a session for BI purposes. So my steps are: -Load the loglines -Do a map to create a K,V for each log line -Do a groupByKey. -Do a final map on the log lines to rebuild my session. -Do a count to trigger everything. That did not work at all, I let it run for 35 minutes and all it was doing was disk read/write and all the cpu were blocked on IO wait and I have 1% free Mem. So, I thought that I could help by reading my log lines in chunks of 1 200 000 lines and THEN doing a groupByKey on that subset. After everything was done, I would just combine all my rdd with + and do a final groupByKey pass. The result is still the same, heavy disk swapping, 1% memory left and all the CPU are doing io wait. It looks like: -Load subset -Do a map to create a K,V for each log line -Do a groupByKey. -Add all the subset rdd together. -Do a final groupByKey. -Do a count. I can post the code if it would help but there's a lot of code confusing the issue that's used to extract the logs from mongodb with a flatmap. This is the memory usage of each process, it's an issue because I have 12GB of RAM on that machine: VIRTRESSHR S %CPU TIME+ COMMAND 3378712 2.646g700 D 0.3 0:21.30 python 3377568 2.566g700 D 0.0 0:20.80 python 3374984 2.485g700 D 0.0 0:20.29 python 3375588 2.449g700 D 0.3 0:20.62 python 3495560 206908 3920 S 1.3 0:45.36 java If I look at the swap space with free, same thing, there's no memory left to swap out from buffer/cache total used free sharedbuffers cached Mem: 12305524 12159320 146204 20 1072 29036 -/+ buffers/cache: 12129212 176312 Swap: 585727638852961971980 In the screenshot below, you can see the step where it's stuck at. The substep are groups of 4 because I break down each sub chunk into blocks of 4. http://apache-spark-user-list.1001560.n3.nabble.com/file/n10134/issue.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Last-step-of-processing-is-using-too-much-memory-tp10134.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
spark1.0.1 spark sql error java.lang.NoClassDefFoundError: Could not initialize class $line11.$read$
when I run a query to a hadoop file. mobile.registerAsTable(mobile) val count = sqlContext.sql(select count(1) from mobile) res5: org.apache.spark.sql.SchemaRDD = SchemaRDD[21] at RDD at SchemaRDD.scala:100 == Query Plan == ExistingRdd [data_date#0,mobile#1,create_time#2], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:176 when I run collect. count.collect() It throws exceptions, Can anyone help me ? Job aborted due to stage failure: Task 3.0:22 failed 4 times, most recent failure: Exception failure in TID 153 on host wh-8-210: java.lang.NoClassDefFoundError: Could not initialize class $line11.$read$ $line12.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:19) $line12.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:19) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$1.next(Iterator.scala:853) scala.collection.Iterator$$anon$1.head(Iterator.scala:840) org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.apply(basicOperators.scala:181) org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.apply(basicOperators.scala:176) org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) java.lang.Thread.run(Thread.java:722) Driver stacktrace: java.lang.ExceptionInInitializerError at $line11.$read$$iwC.init(console:6) at $line11.$read.init(console:26) at $line11.$read$.init(console:30) at $line11.$read$.clinit(console) at $line12.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:19) at $line12.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:19) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$1.next(Iterator.scala:853) at scala.collection.Iterator$$anon$1.head(Iterator.scala:840) at org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.apply(basicOperators.scala:181) at org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.apply(basicOperators.scala:176) at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) My classpath is : /app/hadoop/spark-1.0.1/assembly/target/scala-2.10/spark-assembly-1.0.1-hadoop0.20.2-cdh3u5.jar System Classpath /app/hadoop/spark-1.0.1/confSystem Classpath /app/hadoop/spark-1.0.1/lib_managed/jars/JavaEWAH-0.3.2.jar System Classpath /app/hadoop/spark-1.0.1/lib_managed/jars/JavaEWAH-0.6.6.jar System Classpath /app/hadoop/spark-1.0.1/lib_managed/jars/ST4-4.0.4.jar System Classpath /app/hadoop/spark-1.0.1/lib_managed/jars/activation-1.1.jar System Classpath /app/hadoop/spark-1.0.1/lib_managed/jars/akka-actor_2.10-2.2.3-shaded-protobuf.jar System Classpath /app/hadoop/spark-1.0.1/lib_managed/jars/algebird-core_2.10-0.1.11.jar System Classpath