Re: cannot access port 4040
Hi Maria, Have you tried the 8080 as well ? Thanks Himanshu -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/cannot-access-port-4040-tp23248p23249.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: cannot access port 4040
Hi Akhil, (Your reply does not appear in the mailing list but I received an email so I will reply here). I have an application running already in the shell using pyspark. I can see the application running on port 8080, but I cannot log into it through port 4040. It says connection timed out after a while. I tried relaunching my cluster using the spark-ec2 script but still no success. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/cannot-access-port-4040-tp23248p23251.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: cannot access port 4040
Hi Akhil, Thanks for your reply! I still cannot see port 4040 in my machine when I type master-ip-address:4040 in my browser. I have tried this command: netstat -nat | grep 4040 and it returns this: tcp0 0 :::4040 :::* LISTEN Logging into my master is not a problem since I can access port 8080 by writing master-ip-address:8080 in my browser. I have made sure that spark.ui.enabled was set to True by launching my application using: ~/spark/bin/pyspark --conf spark.ui.enabled=True. I don't know if this is a symptom of the problem that I have, but it might be another piece of useful information. When I look at Completed Applications in port 8080, I see my two previous applications. One of them says cores: 160, the last one has cores: 0. Could this be a clue? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/cannot-access-port-4040-tp23248p23252.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
cannot access port 4040
Hi, I am using Spark 1.3.1 standalone and I have a problem where my cluster is working fine, I can see the port 8080 and check that my ec2 instances are fine, but I cannot access port 4040. I have tried sbin/stop-all.sh, sbin/stop-master.sh, exiting the spark context and restarting it to no avail. Any clues on what to try next? Thanks, Maria -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/cannot-access-port-4040-tp23248.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Fwd: Re: How to keep a SQLContext instance alive in a spark streaming application's life cycle?
Note: CCing user@spark.apache.org First, you must check if the RDD is empty: messages.foreachRDD { rdd = if (!rdd.isEmpty) { }} Now, you can obtain the instance of a SQLContext: val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) *Optional* In this moment, I like work with DataFrame. I convert RDD to DataFrame. I see that you recive a JSON: val df :DataFrame = sqlContext.jsonRDD(message, getSchema(getSchemaStr)).toDF() My getSchema function create a Schema of my JSON: def getSchemaStr() :String = feature1 feature2 ... def getSchema(schema: String) :StructType = StructType (schema.split( ).map(fieldName = StructField(fieldName, StringType, true))) I hope you helps. Regards. 2015-06-09 17:36 GMT+02:00 codingforfun [via Apache Spark User List] ml-node+s1001560n23226...@n3.nabble.com: I don't know why, you said “Why? I tried this solution and works fine.” means your SQLContext instance alive all the streaming application’s life time, rather than one bath duration ? My code as below: object SQLContextSingleton extends java.io.Serializable{ @transient private var instance: SQLContext = null // Instantiate SQLContext on demand def getInstance(sparkContext: SparkContext): SQLContext = synchronized { if (instance == null) { instance = new SQLContext(sparkContext) } instance } } // type_-typex, id_-id, url_-url case class (time: Timestamp, id: Int, openfrom: Int, tab: Int) extends Serializable case class Count(x: Int) @transient val ssc = new StreamingContext(sc, new Duration(5 * 1000)) ssc.checkpoint(.) val kafkaParams = Map(metadata.broker.list - 10.20.30.40:9092,) @transient val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic_name)) @transient val dddstream= newsIdDStream.map(x = x._2).flatMap(x = x.split(\n)) dddstream.foreachRDD { rdd = SQLContextSingleton.getInstance(rdd.sparkContext).jsonRDD(rdd).registerTempTable(ttable) val ret = SQLContextSingleton.getInstance(rdd.sparkContext).sql(SELECT COUNT(*) FROM ttable) ret.foreach{ x = println(x(0)) } } ssc.start() ssc.awaitTermination() 在 2015-06-09 17:41:44,drarse [via Apache Spark User List] [hidden email] http:///user/SendEmail.jtp?type=nodenode=23226i=0 写道: Why? I tried this solution and works fine. El martes, 9 de junio de 2015, codingforfun [via Apache Spark User List] [hidden email] http:///user/SendEmail.jtp?type=nodenode=23219i=0 escribió: Hi drarse, thanks for replying, the way you said use a singleton object does not work 在 2015-06-09 16:24:25,drarse [via Apache Spark User List] [hidden email] http:///user/SendEmail.jtp?type=nodenode=23218i=0 写道: The best way is create a singleton object like: object SQLContextSingleton { @transient private var instance: SQLContext = null // Instantiate SQLContext on demand def getInstance(sparkContext: SparkContext): SQLContext = synchronized { if (instance == null) { instance = new SQLContext(sparkContext) } instance }} You have more information in the programming guide: https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations 2015-06-09 9:27 GMT+02:00 codingforfun [via Apache Spark User List] [hidden email] http:///user/SendEmail.jtp?type=nodenode=23216i=0: I used SQLContext in a spark streaming application as blew: case class topic_name (f1: Int, f2: Int) val sqlContext = new SQLContext(sc) @transient val ssc = new StreamingContext(sc, new Duration(5 * 1000)) ssc.checkpoint(.) val theDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic_name)) theDStream.map(x = x._2).foreach { rdd = sqlContext.jsonRDD(newsIdRDD).registerTempTable(topic_name) sqlContext.sql(select count(*) from topic_name).foreach { x = WriteToFile(file_path, x(0).toString) } } ssc.start() ssc.awaitTermination() I found i could only get every 5 seconds's count of message, because The lifetime of this temporary table is tied to the SQLContext that was used to create this DataFrame, i guess every 5 seconds, a new sqlContext will be create and the temporary table can only alive just 5 seconds, i want to the sqlContext and the temporary table alive all the streaming application's life cycle, how to do it? Thanks~ -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215.html To
Re: cannot access port 4040
4040 is your driver port, you need to run some application. Login to your cluster start a spark-shell and try accessing 4040. Thanks Best Regards On Wed, Jun 10, 2015 at 3:51 PM, mrm ma...@skimlinks.com wrote: Hi, I am using Spark 1.3.1 standalone and I have a problem where my cluster is working fine, I can see the port 8080 and check that my ec2 instances are fine, but I cannot access port 4040. I have tried sbin/stop-all.sh, sbin/stop-master.sh, exiting the spark context and restarting it to no avail. Any clues on what to try next? Thanks, Maria -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/cannot-access-port-4040-tp23248.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DataFrame.save with SaveMode.Overwrite produces 3x higher data size
Additionally, if I delete the parquet and recreate it using the same generic save function with 1000 partitions and overwrite the size is again correct. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-save-with-SaveMode-Overwrite-produces-3x-higher-data-size-tp23245p23250.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Join between DStream and Periodically-Changing-RDD
RDD's are immutable, why not join two DStreams? Not sure, but you can try something like this also: kvDstream.foreachRDD(rdd = { val file = ssc.sparkContext.textFile(/sigmoid/) val kvFile = file.map(x = (x.split(,)(0), x)) rdd.join(kvFile) }) Thanks Best Regards On Tue, Jun 9, 2015 at 7:37 PM, Ilove Data data4...@gmail.com wrote: Hi, I'm trying to join DStream with interval let say 20s, join with RDD loaded from HDFS folder which is changing periodically, let say new file is coming to the folder for every 10 minutes. How should it be done, considering the HDFS files in the folder is periodically changing/adding new files? Do RDD automatically detect changes in HDFS folder as RDD source and automatically reload RDD? Thanks! Rendy
Re: append file on hdfs
hi, i have an idea to solve my problem, i want write one file for each spark partion, but i not know to get the actuel partion suffix/ID in my call function? points.foreachPartition( new VoidFunctionIteratorTuple2Integer, GeoTimeDataTupel() { private static final long serialVersionUID = -7210897529331503565L; public void call(IteratorTuple2Integer, GeoTimeDataTupel entry)throws Exception { while(entry.hasNext()) { Tuple2Integer, GeoTimeDataTupel temp = entry.next(); try { FileSystem fs = FileSystem.get(new URI(pro.getProperty(hdfs.namenode)),new Configuration()); Path pt=new Path(fs.getHomeDirectory()+pro.getProperty(spark.output)+/results); } catch(Exception e) { e.printStackTrace(); } } } } ); 2015-06-09 15:34 GMT+02:00 Pa Rö paul.roewer1...@googlemail.com: hi community, i want append results to one file. if i work local my function build all right, if i run this on a yarn cluster, i lost same rows. here my function to write: points.foreach( new VoidFunctionTuple2Integer, GeoTimeDataTupel() { private static final long serialVersionUID = 2459995649387229261L; public void call(Tuple2Integer, GeoTimeDataTupel entry)throws Exception { try { FileSystem fs = FileSystem.get(new URI(pro.getProperty(hdfs.namenode)),new Configuration()); Path pt=new Path(fs.getHomeDirectory()+pro.getProperty(spark.output)+/results); if(fs.exists(pt)) { FSDataInputStream in = fs.open(pt); Path pt_temp = new Path(fs.getHomeDirectory()+pro.getProperty(spark.output)+/results_temp); backup(fs.getConf(), fs, in, pt_temp); in.close(); FSDataOutputStream out = fs.create((pt), true); FSDataInputStream backup = fs.open(pt_temp); int offset = 0; int bufferSize = 4096; int result = 0; byte[] buffer = new byte[bufferSize]; // pre read a part of content from input stream result = backup.read(offset, buffer, 0, bufferSize); // loop read input stream until it does not fill whole size of buffer while (result == bufferSize) { out.write(buffer); // read next segment from input stream by moving the offset pointer offset += bufferSize; result = backup.read(offset, buffer, 0, bufferSize); } if (result 0 result bufferSize) { for (int i = 0; i result; i++) { out.write(buffer[i]); } } out.writeBytes(Cluster: +entry._1+, Point: +entry._2.toString()+\n); out.close(); } else { BufferedWriter bw =new BufferedWriter(new OutputStreamWriter(fs.create(pt))); bw.write(Cluster: +entry._1+, Point: +entry._2.toString()+\n); bw.close(); } } catch (Exception e) { e.printStackTrace(); } } public void backup(Configuration conf, FileSystem fs,FSDataInputStream sourceContent, Path pt_temp) throws Exception { FSDataOutputStream out = fs.create(pt_temp, true); IOUtils.copyBytes(sourceContent, out, 4096, false); out.close(); } where is my fault?? or give it a function to write(append) to the hadoop hdfs? best regards, paul
DataFrame.save with SaveMode.Overwrite produces 3x higher data size
Hi, Kudos on Spark 1.3.x, it's a great release - loving data frames! One thing I noticed after upgrading is that if I use the generic save DataFrame function with Overwrite mode and a parquet source it produces much larger output parquet file. Source json data: ~500GB Originally saved parquet: ~30GB to 1000 partitions Overwritten parquet: ~90GB to 1000 partitions Now the really strange thing is that if I overwrite that parquet again it will again be ~30GB for 1000 parts. How can I get a consistent behaviour with this? The overwrite mode is very useful for my use-case. Thanks, Borislav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-save-with-SaveMode-Overwrite-produces-3x-higher-data-size-tp23245.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Split RDD based on criteria
Hi, I'm gathering that the typical approach for splitting an RDD is to apply several filters to it. rdd1 = rdd.filter(func1); rdd2 = rdd.filter(func2); ... Is there/should there be a way to create 'buckets' like these in one go? ListRDD rddList = rdd.filter(func1, func2, ..., funcN) Another angle here is, when applying a filter(func), is there a way to get two RDD's back, one for which func returned true for all elements of the original RDD (the one being filtered), and the other one for which func returned false for all the elements? PairRDD pair = rdd.filterTrueFalse(func); Right now I'm doing RDD x = rdd.filter(func); RDD y = rdd.filter(reverseOfFunc); This seems a bit tautological to me, though Spark must be optimizing this out (?) Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Split-RDD-based-on-criteria-tp23254.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark standalone mode and kerberized cluster
Hello all. I've been reading some old mails and notice that the use of kerberos in a standalone cluster was not supported. Is this stillt he case? Thanks. Borja. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-mode-and-kerberized-cluster-tp23255.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: cannot access port 4040
Opening your 4040 manually or ssh tunneling (ssh -L 4040:127.0.0.1:4040 master-ip, and then open localhost:4040 in browser.) will work for you then . Thanks Best Regards On Wed, Jun 10, 2015 at 5:10 PM, mrm ma...@skimlinks.com wrote: Hi Akhil, Thanks for your reply! I still cannot see port 4040 in my machine when I type master-ip-address:4040 in my browser. I have tried this command: netstat -nat | grep 4040 and it returns this: tcp0 0 :::4040 :::* LISTEN Logging into my master is not a problem since I can access port 8080 by writing master-ip-address:8080 in my browser. I have made sure that spark.ui.enabled was set to True by launching my application using: ~/spark/bin/pyspark --conf spark.ui.enabled=True. I don't know if this is a symptom of the problem that I have, but it might be another piece of useful information. When I look at Completed Applications in port 8080, I see my two previous applications. One of them says cores: 160, the last one has cores: 0. Could this be a clue? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/cannot-access-port-4040-tp23248p23252.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark uses too much memory maybe (binaryFiles() with more than 1 million files in HDFS), groupBy or reduceByKey()
Both the driver (ApplicationMaster running on hadoop) and container (CoarseGrainedExecutorBackend) end up exceeding my 25GB allocation. my code is something like sc.binaryFiles(... 1mil xml files).flatMap( ... extract some domain classes, not many though as each xml usually have zero results).reduceByKey( reducer ).saveAsObjectFile() Initially I had it with groupBy but that method uses a lot of resources (according to the javadocs). Switching to reduceByKey didn't have any effect. Seems like spark goes into 2 cycles of calculations of ~270k of items. In the 1st round, around 15GB of memory are used and that memory is not cleaned up by a GC. That is true for both the driver and container. On the 2nd round, it keeps on allocating memory till it runs out of it and yarn kills it. Any ideas? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-uses-too-much-memory-maybe-binaryFiles-with-more-than-1-million-files-in-HDFS-groupBy-or-reduc-tp23253.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[Spark 1.3.1 on YARN on EMR] Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
Hi! I'm struggling with an issue with Spark 1.3.1 running on YARN, running on an AWS EMR cluster. Such cluster is based on AMI 3.7.0 (hence Amazon Linux 2015.03, Hive 0.13 already installed and configured on the cluster, Hadoop 2.4, etc...). I make use of the AWS emr-bootstrap-action *install-spark* ( https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark) with the option/version* -v1.3.1e* so to get the latest Spark for EMR installed and available. I also have a simple Spark Streaming driver in my project. Such driver is part of a larger Maven project: in the *pom.xml* I'm currently using [...] scala.binary.version2.10/scala.binary.version scala.version2.10.4/scala.version java.version1.7/java.version spark.version1.3.1/spark.version hadoop.version2.4.1/hadoop.version [] dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_${scala.binary.version}/artifactId version${spark.version}/version scopeprovided/scope exclusions exclusion groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId /exclusion /exclusions /dependency dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId version${hadoop.version}/version scopeprovided/scope /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-hive_${scala.binary.version}/artifactId version${spark.version}/version scopeprovided/scope /dependency In fact, at compile and build time everything works just fine if, in my driver, I have: - *val* sparkConf = *new* SparkConf() .setAppName(appName) .set(spark.local.dir, /tmp/ + appName) .set(spark.streaming.unpersist, true) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .registerKryoClasses(Array(classOf[java.net.URI], classOf[String])) *val* sc = *new* SparkContext(sparkConf) *val* ssc = *new* StreamingContext(sc, config.batchDuration) *import* org.apache.spark.streaming.StreamingContext._ ssc.checkpoint(sparkConf.get(spark.local.dir) + checkpointRelativeDir) some input reading actions some input transformation actions *val* sqlContext = *new* org.apache.spark.sql.hive.HiveContext(sc) *import* sqlContext.implicits._ sqlContext.sql(an-HiveQL-query) ssc.start() ssc.awaitTerminationOrTimeout(config.timeout) --- What happens is that, right after have been launched, the driver fails with the exception: 15/06/10 11:38:18 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346) at org.apache.spark.sql.hive.HiveContext.sessionState$lzycompute(HiveContext.scala:239) at org.apache.spark.sql.hive.HiveContext.sessionState(HiveContext.scala:235) at org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:251) at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:250) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:95) at myDriver.scala: line of the sqlContext.sql(query) Caused by some stuff Caused by: javax.jdo.JDOFatalUserException: Class org.datanucleus.api.jdo.JDOPersistenceManagerFactory was not found. NestedThrowables: java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory ... Caused by: java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory Thinking about a wrong Hive installation/configuration or libs/classpath definition, I SSHed into the cluster and launched a *spark-shell.* Excluding the app configuration and StreamingContext usage/definition, I then carried out all the actions listed in the driver implementation, in particular all the Hive-related ones and they all went through smoothly! I also tried to use the optional *-h* argument ( https://github.com/awslabs/emr-bootstrap-actions/blob/master/spark/README.md#arguments-optional) in the install-spark emr-bootstrap-action, but the driver failed the very same way. Furthermore, when launching a spark-shell (on the EMR cluster with Spark installed with the -h option), I also got: 15/06/09 14:20:51 WARN conf.HiveConf: hive-default.xml not found on CLASSPATH 15/06/09 14:20:52 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/06/09 14:20:52 INFO metastore.ObjectStore: ObjectStore, initialize called 15/06/09 14:20:52 WARN DataNucleus.General: Plugin (Bundle) org.datanucleus is already registered. Ensure you dont have multiple JAR versions
learning rpc about spark core source code
Hi all Recently I have learned about 1.3 spark core source code , can't understand rpc, How to communicate between client driver, worker and master? There are some scala files such as RpcCallContextRpcEndPointRef RpcEndpoint RpcEnv. On spark core rpc module Have any blogs ? Thank you very much!
Re: Apache Phoenix (4.3.1 and 4.4.0-HBase-0.98) on Spark 1.3.1 ClassNotFoundException
Hi Jeroen, Rather than bundle the Phoenix client JAR with your app, are you able to include it in a static location either in the SPARK_CLASSPATH, or set the conf values below (I use SPARK_CLASSPATH myself, though it's deprecated): spark.driver.extraClassPath spark.executor.extraClassPath Josh On Wed, Jun 10, 2015 at 4:11 AM, Jeroen Vlek j.v...@anchormen.nl wrote: Hi Josh, Thank you for your effort. Looking at your code, I feel that mine is semantically the same, except written in Java. The dependencies in the pom.xml all have the scope provided. The job is submitted as follows: $ rm spark.log MASTER=spark://maprdemo:7077 /opt/mapr/spark/spark-1.3.1/bin/spark-submit-jars /home/mapr/projects/customer/lib/spark-streaming- kafka_2.10-1.3.1.jar,/home/mapr/projects/customer/lib/kafka_2.10-0.8.1.1.jar,/home/mapr/projects/customer/lib/zkclient-0.3.jar,/home/mapr/projects/customer/lib/metrics- core-3.1.0.jar,/home/mapr/projects/customer/lib/metrics- core-2.2.0.jar,lib/spark-sql_2.10-1.3.1.jar,/opt/mapr/phoenix/phoenix-4.4.0- HBase-0.98-bin/phoenix-4.4.0-HBase-0.98-client.jar --class nl.work.kafkastreamconsumer.phoenix.KafkaPhoenixConnector KafkaStreamConsumer.jar maprdemo:5181 0 topic jdbc:phoenix:maprdemo:5181 true The spark-defaults.conf is reverted back to its defaults (i.e. no userClassPathFirst). In the catch-block of the Phoenix connection buildup the class path is printed by recursively iterating over the class loaders. The first one already prints the phoenix-client jar [1]. It's also very unlikely to be a bug in Spark or Phoenix, if your proof-of-concept just works. So if the JAR that contains the offending class is known by the class loader, then that might indicate that there's a second JAR providing the same class but with a different version, right? Yet, the only Phoenix JAR on the whole class path hierarchy is the aforementioned phoenix-client JAR. Furthermore, I googled the class in question, ClientRpcControllerFactory, and it really only exists in the Phoenix project. We're not talking about some low-level AOP Alliance stuff here ;) Maybe I'm missing some fundamental class loading knowledge, in that case I'd be very happy to be enlightened. This all seems very strange. Cheers, Jeroen [1] [file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./spark- streaming-kafka_2.10-1.3.1.jar, file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./kafka_2.10-0.8.1.1.jar, file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./zkclient-0.3.jar, file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./phoenix-4.4.0- HBase-0.98-client.jar, file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./spark- sql_2.10-1.3.1.jar, file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./metrics- core-3.1.0.jar, file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./KafkaStreamConsumer.jar, file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./metrics- core-2.2.0.jar] On Tuesday, June 09, 2015 11:18:08 AM Josh Mahonin wrote: This may or may not be helpful for your classpath issues, but I wanted to verify that basic functionality worked, so I made a sample app here: https://github.com/jmahonin/spark-streaming-phoenix This consumes events off a Kafka topic using spark streaming, and writes out event counts to Phoenix using the new phoenix-spark functionality: http://phoenix.apache.org/phoenix_spark.html It's definitely overkill, and would probably be more efficient to use the JDBC driver directly, but it serves as a proof-of-concept. I've only tested this in local mode. To convert it to a full jobs JAR, I suspect that keeping all of the spark and phoenix dependencies marked as 'provided', and including the Phoenix client JAR in the Spark classpath would work as well. Good luck, Josh On Tue, Jun 9, 2015 at 4:40 AM, Jeroen Vlek j.v...@work.nl wrote: Hi, I posted a question with regards to Phoenix and Spark Streaming on StackOverflow [1]. Please find a copy of the question to this email below the first stack trace. I also already contacted the Phoenix mailing list and tried the suggestion of setting spark.driver.userClassPathFirst. Unfortunately that only pushed me further into the dependency hell, which I tried to resolve until I hit a wall with an UnsatisfiedLinkError on Snappy. What I am trying to achieve: To save a stream from Kafka into Phoenix/Hbase via Spark Streaming. I'm using MapR as a platform and the original exception happens both on a 3-node cluster, as on the MapR Sandbox (a VM for experimentation), in YARN and stand-alone mode. Further experimentation (like the saveAsNewHadoopApiFile below), was done only on the sandbox in standalone mode. Phoenix only supports Spark from 4.4.0 onwards, but I thought I could use a naive implementation that
Re: BigDecimal problem in parquet file
Hi Cheng, I am using Spark 1.3.1 binary available for Hadoop 2.6. I am loading an existing parquet file, then repartitioning and saving it. Doing this gives the error. The code for this doesn't look like causing problem. I have a feeling the source - the existing parquet is the culprit. I created that parquet using a jdbcrdd (pulled from microsoft sql server). First I saved jdbcrdd as an objectfile on disk. Then loaded it again, made a dataframe from it using a schema then saved it as a parquet. Following is the code : For saving jdbcrdd: name - fullqualifiedtablename pk - string for primarykey pklast - last id to pull val myRDD = new JdbcRDD( sc, () = DriverManager.getConnection(url,username,password) , SELECT * FROM + name + WITH (NOLOCK) WHERE ? = +pk+ and +pk+ = ?, 1, lastpk, 1, JdbcRDD.resultSetToObjectArray) myRDD.saveAsObjectFile(rawdata/+name); For applying schema and saving the parquet: val myschema = schemamap(name) val myrdd = sc.objectFile[Array[Object]](/home/bipin/rawdata/+name).map(x = org.apache.spark.sql.Row(x:_*)) val actualdata = sqlContext.createDataFrame(myrdd, myschema) actualdata.saveAsParquetFile(/home/bipin/stageddata/+name) Schema structtype can be made manually, though I pull table's metadata and make one. It is a simple string translation (see sql docs https://msdn.microsoft.com/en-us/library/ms378878%28v=sql.110%29.aspx and/or spark datatypes https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#data-types) That is how I created the parquet file. Any help to solve the issue is appreciated. Thanks Bipin On 9 June 2015 at 20:44, Cheng Lian lian.cs@gmail.com wrote: Would you please provide a snippet that reproduce this issue? What version of Spark were you using? Cheng On 6/9/15 8:18 PM, bipin wrote: Hi, When I try to save my data frame as a parquet file I get the following error: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to org.apache.spark.sql.types.Decimal at org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:220) at org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192) at org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171) at org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134) at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.ParquetRelation2.org $apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:671) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) How to fix this problem ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/BigDecimal-problem-in-parquet-file-tp23221.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: learning rpc about spark core source code
The new RPC interface is an internal module and added in 1.4. It should not exist in 1.3. Where did you find it? For the communication between driver, worker and master, it still uses Akka. There are a pending PR to update them: https://github.com/apache/spark/pull/5392 Do you mean the communication between driver and executors? Because this is an ongoing work, there is no blog now. But you can find more details in this umbrella JIRA: https://issues.apache.org/jira/browse/SPARK-5293 Best Regards, Shixiong Zhu 2015-06-10 20:33 GMT+08:00 huangzheng 1106944...@qq.com: Hi all Recently I have learned about 1.3 spark core source code , can’t understand rpc, How to communicate between client driver, worker and master? There are some scala files such as RpcCallContextRpcEndPointRef RpcEndpoint RpcEnv. On spark core rpc module Have any blogs ? Thank you very much!
Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
Hm, I tried the following with 0.13.1 and 0.13.0 on my laptop (don't have access to a cluster for now) but couldn't reproduce this issue. Your program just executed smoothly... :-/ Command line used to start the Thrift server: ./sbin/start-thriftserver.sh --driver-memory 4g --master local SQL statements used to create the table with your data: create table foo(k string, v double); load data local inpath '/tmp/bar' into table foo; Tried this via Beeline: select * from foo limit 160; Also tried the Java program you provided. Could you also try to verify whether this single node local mode works for you? Will investigate this with a cluster when I get chance. Cheng On 6/10/15 5:19 PM, 姜超才 wrote: When set spark.sql.thriftServer.incrementalCollect and set driver memory to 7G, Things seems stable and simple: It can quickly run through the query line, but when traversal the result set ( while rs.hasNext ), it can quickly get the OOM: java heap space. See attachment. /usr/local/spark/spark-1.3.0/sbin/start-thriftserver.sh --master spark://cx-spark-001:7077 --conf spark.executor.memory=4g --conf spark.driver.memory=7g --conf spark.shuffle.consolidateFiles=true --conf spark.shuffle.manager=sort --conf spark.executor.extraJavaOptions=-XX:-UseGCOverheadLimit --conf spark.file.transferTo=false --conf spark.akka.timeout=2000 --conf spark.storage.memoryFraction=0.4 --conf spark.cores.max=8 --conf spark.kryoserializer.buffer.mb=256 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.akka.frameSize=512 --driver-class-path /usr/local/hive/lib/classes12.jar --conf spark.sql.thriftServer.incrementalCollect=true Thanks, SuperJ - 原始邮件信息 - *发件人:* Cheng Lian l...@databricks.com *收件人:* 姜超才 jiangchao...@haiyisoft.com, Hester wang hester9...@gmail.com, user@spark.apache.org *主题:* Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows. *日期:* 2015/06/10 16:37:34 (Wed) Also, if the data isn't confidential, would you mind to send me a compressed copy (don't cc user@spark.apache.org)? Cheng On 6/10/15 4:23 PM, 姜超才 wrote: Hi Lian, Thanks for your quick response. I forgot mention that I have tuned driver memory from 2G to 4G, seems got minor improvement, The dead way when fetching 1,400,000 rows changed from OOM::GC overhead limit exceeded to lost worker heartbeat after 120s. I will try to set spark.sql.thriftServer.incrementalCollect and continue increase driver memory to 7G, and will send the result to you. Thanks, SuperJ - 原始邮件信息 - *发件人:* Cheng Lian *收件人:* Hester wang , *主题:* Re: Met OOM when fetching more than 1,000,000 rows. *日期:* 2015/06/10 16:15:47 (Wed) Hi Xiaohan, Would you please try to set spark.sql.thriftServer.incrementalCollect to true and increasing driver memory size? In this way, HiveThriftServer2 uses RDD.toLocalIterator rather than RDD.collect().iterator to return the result set. The key difference is that RDD.toLocalIterator retrieves a single partition at a time, thus avoid holding the whole result set on driver side. The memory issue happens on driver side rather than executor side, so tuning executor memory size doesn't help. Cheng On 6/10/15 3:46 PM, Hester wang wrote: Hi Lian, I met a SparkSQL problem. I really appreciate it if you could give me some help! Below is the detailed description of the problem, for more information, attached are the original code and the log that you may need. Problem: I want to query my table which stored in Hive through the SparkSQL JDBC interface. And want to fetch more than 1,000,000 rows. But met OOM. sql = select * from TEMP_ADMIN_150601_01 limit XXX ; My Env: 5 Nodes = One master + 4 workers, 1000M Network Switch , Redhat 6.5 Each node: 8G RAM, 500G Harddisk Java 1.6, Scala 2.10.4, Hadoop 2.6, Spark 1.3.0, Hive 0.13 Data: A table with user and there charge for electricity data. About 1,600,000 Rows. About 28MB. Each row occupy about 18 Bytes. 2 columns: user_id String, total_num Double Repro Steps: 1. Start Spark 2. Start SparkSQL thriftserver, command: /usr/local/spark/spark-1.3.0/sbin/start-thriftserver.sh --master spark://cx-spark-001:7077 --conf spark.executor.memory=4g --conf spark.driver.memory=2g --conf spark.shuffle.consolidateFiles=true --conf spark.shuffle.manager=sort --conf spark.executor.extraJavaOptions=-XX:-UseGCOverheadLimit --conf spark.file.transferTo=false --conf spark.akka.timeout=2000 --conf spark.storage.memoryFraction=0.4 --conf spark.cores.max=8 --conf spark.kryoserializer.buffer.mb=256 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.akka.frameSize=512 --driver-class-path /usr/local/hive/lib/classes12.jar 3. Run the test code, see it in attached file: testHiveJDBC.java 4. Get the OOM:GC overhead limit exceeded or OOM: java heap space or lost worker heartbeat after 120s. see the attached logs. Preliminary diagnose: 1. When fetching
Re: PostgreSQL JDBC Classpath Issue
Hi George, I have same issue, did you manage to find a solution? best, /Shahab On Wed, May 13, 2015 at 9:21 PM, George Adams g.w.adams...@gmail.com wrote: Hey all, I seem to be having an issue with PostgreSQL JDBC jar on my classpath. I’ve outlined the issue on Stack Overflow ( http://stackoverflow.com/questions/30221677/spark-sql-postgresql-data-source-issues). I’m not sure how to fix this since I built the uber jar using sbt-assembly and the final jar does have org/postgresql/Driver.class. — George Adams, IV Software Craftsman Brand Networks, Inc. (585) 902-8822
Re: Apache Phoenix (4.3.1 and 4.4.0-HBase-0.98) on Spark 1.3.1 ClassNotFoundException
Hi Josh, Thank you for your effort. Looking at your code, I feel that mine is semantically the same, except written in Java. The dependencies in the pom.xml all have the scope provided. The job is submitted as follows: $ rm spark.log MASTER=spark://maprdemo:7077 /opt/mapr/spark/spark-1.3.1/bin/spark-submit-jars /home/mapr/projects/customer/lib/spark-streaming- kafka_2.10-1.3.1.jar,/home/mapr/projects/customer/lib/kafka_2.10-0.8.1.1.jar,/home/mapr/projects/customer/lib/zkclient-0.3.jar,/home/mapr/projects/customer/lib/metrics- core-3.1.0.jar,/home/mapr/projects/customer/lib/metrics- core-2.2.0.jar,lib/spark-sql_2.10-1.3.1.jar,/opt/mapr/phoenix/phoenix-4.4.0- HBase-0.98-bin/phoenix-4.4.0-HBase-0.98-client.jar --class nl.work.kafkastreamconsumer.phoenix.KafkaPhoenixConnector KafkaStreamConsumer.jar maprdemo:5181 0 topic jdbc:phoenix:maprdemo:5181 true The spark-defaults.conf is reverted back to its defaults (i.e. no userClassPathFirst). In the catch-block of the Phoenix connection buildup the class path is printed by recursively iterating over the class loaders. The first one already prints the phoenix-client jar [1]. It's also very unlikely to be a bug in Spark or Phoenix, if your proof-of-concept just works. So if the JAR that contains the offending class is known by the class loader, then that might indicate that there's a second JAR providing the same class but with a different version, right? Yet, the only Phoenix JAR on the whole class path hierarchy is the aforementioned phoenix-client JAR. Furthermore, I googled the class in question, ClientRpcControllerFactory, and it really only exists in the Phoenix project. We're not talking about some low-level AOP Alliance stuff here ;) Maybe I'm missing some fundamental class loading knowledge, in that case I'd be very happy to be enlightened. This all seems very strange. Cheers, Jeroen [1] [file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./spark- streaming-kafka_2.10-1.3.1.jar, file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./kafka_2.10-0.8.1.1.jar, file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./zkclient-0.3.jar, file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./phoenix-4.4.0- HBase-0.98-client.jar, file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./spark- sql_2.10-1.3.1.jar, file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./metrics- core-3.1.0.jar, file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./KafkaStreamConsumer.jar, file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./metrics- core-2.2.0.jar] On Tuesday, June 09, 2015 11:18:08 AM Josh Mahonin wrote: This may or may not be helpful for your classpath issues, but I wanted to verify that basic functionality worked, so I made a sample app here: https://github.com/jmahonin/spark-streaming-phoenix This consumes events off a Kafka topic using spark streaming, and writes out event counts to Phoenix using the new phoenix-spark functionality: http://phoenix.apache.org/phoenix_spark.html It's definitely overkill, and would probably be more efficient to use the JDBC driver directly, but it serves as a proof-of-concept. I've only tested this in local mode. To convert it to a full jobs JAR, I suspect that keeping all of the spark and phoenix dependencies marked as 'provided', and including the Phoenix client JAR in the Spark classpath would work as well. Good luck, Josh On Tue, Jun 9, 2015 at 4:40 AM, Jeroen Vlek j.v...@work.nl wrote: Hi, I posted a question with regards to Phoenix and Spark Streaming on StackOverflow [1]. Please find a copy of the question to this email below the first stack trace. I also already contacted the Phoenix mailing list and tried the suggestion of setting spark.driver.userClassPathFirst. Unfortunately that only pushed me further into the dependency hell, which I tried to resolve until I hit a wall with an UnsatisfiedLinkError on Snappy. What I am trying to achieve: To save a stream from Kafka into Phoenix/Hbase via Spark Streaming. I'm using MapR as a platform and the original exception happens both on a 3-node cluster, as on the MapR Sandbox (a VM for experimentation), in YARN and stand-alone mode. Further experimentation (like the saveAsNewHadoopApiFile below), was done only on the sandbox in standalone mode. Phoenix only supports Spark from 4.4.0 onwards, but I thought I could use a naive implementation that creates a new connection for every RDD from the DStream in 4.3.1. This resulted in the ClassNotFoundException described in [1], so I switched to 4.4.0. Unfortunately the saveToPhoenix method is only available in Scala. So I did find the suggestion to try it via the saveAsNewHadoopApiFile method [2] and an example implementation [3], which I adapted to my own needs. However, 4.4.0 + saveAsNewHadoopApiFile raises the same
Re: Spark's Scala shell killing itself
May be you should update your spark version to the latest one. Thanks Best Regards On Wed, Jun 10, 2015 at 11:04 AM, Chandrashekhar Kotekar shekhar.kote...@gmail.com wrote: Hi, I have configured Spark to run on YARN. Whenever I start spark shell using 'spark-shell' command, it automatically gets killed. Output looks like below: ubuntu@dev-cluster-gateway:~$ ls shekhar/ edx-spark ubuntu@dev-cluster-gateway:~$ spark-shell Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.2.0-SNAPSHOT /_/ Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_75) Type in expressions to have them evaluated. Type :help for more information. 15/06/10 05:20:45 WARN Utils: Your hostname, dev-cluster-gateway resolves to a loopback address: 127.0.0.1; using 10.182.149.171 instead (on interface eth0) 15/06/10 05:20:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 15/06/10 05:21:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable /usr/lib/spark/bin/spark-shell: line 48: 15573 Killed $FWDIR/bin/spark-submit --class org.apache.spark.repl.Main ${SUBMISSION_OPTS[@]} spark-shell ${APPLICATION_OPTS[@]} Any clue why spark shell gets killed? Please let me know if other configuration/information is required. Thanks, Chandrash3khar Kotekar
Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
Also, if the data isn't confidential, would you mind to send me a compressed copy (don't cc user@spark.apache.org)? Cheng On 6/10/15 4:23 PM, 姜超才 wrote: Hi Lian, Thanks for your quick response. I forgot mention that I have tuned driver memory from 2G to 4G, seems got minor improvement, The dead way when fetching 1,400,000 rows changed from OOM::GC overhead limit exceeded to lost worker heartbeat after 120s. I will try to set spark.sql.thriftServer.incrementalCollect and continue increase driver memory to 7G, and will send the result to you. Thanks, SuperJ - 原始邮件信息 - *发件人:* Cheng Lian l...@databricks.com *收件人:* Hester wang hester9...@gmail.com, user@spark.apache.org *主题:* Re: Met OOM when fetching more than 1,000,000 rows. *日期:* 2015/06/10 16:15:47 (Wed) Hi Xiaohan, Would you please try to set spark.sql.thriftServer.incrementalCollect to true and increasing driver memory size? In this way, HiveThriftServer2 uses RDD.toLocalIterator rather than RDD.collect().iterator to return the result set. The key difference is that RDD.toLocalIterator retrieves a single partition at a time, thus avoid holding the whole result set on driver side. The memory issue happens on driver side rather than executor side, so tuning executor memory size doesn't help. Cheng On 6/10/15 3:46 PM, Hester wang wrote: Hi Lian, I met a SparkSQL problem. I really appreciate it if you could give me some help! Below is the detailed description of the problem, for more information, attached are the original code and the log that you may need. Problem: I want to query my table which stored in Hive through the SparkSQL JDBC interface. And want to fetch more than 1,000,000 rows. But met OOM. sql = select * from TEMP_ADMIN_150601_01 limit XXX ; My Env: 5 Nodes = One master + 4 workers, 1000M Network Switch , Redhat 6.5 Each node: 8G RAM, 500G Harddisk Java 1.6, Scala 2.10.4, Hadoop 2.6, Spark 1.3.0, Hive 0.13 Data: A table with user and there charge for electricity data. About 1,600,000 Rows. About 28MB. Each row occupy about 18 Bytes. 2 columns: user_id String, total_num Double Repro Steps: 1. Start Spark 2. Start SparkSQL thriftserver, command: /usr/local/spark/spark-1.3.0/sbin/start-thriftserver.sh --master spark://cx-spark-001:7077 --conf spark.executor.memory=4g --conf spark.driver.memory=2g --conf spark.shuffle.consolidateFiles=true --conf spark.shuffle.manager=sort --conf spark.executor.extraJavaOptions=-XX:-UseGCOverheadLimit --conf spark.file.transferTo=false --conf spark.akka.timeout=2000 --conf spark.storage.memoryFraction=0.4 --conf spark.cores.max=8 --conf spark.kryoserializer.buffer.mb=256 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.akka.frameSize=512 --driver-class-path /usr/local/hive/lib/classes12.jar 3. Run the test code, see it in attached file: testHiveJDBC.java 4. Get the OOM:GC overhead limit exceeded or OOM: java heap space or lost worker heartbeat after 120s. see the attached logs. Preliminary diagnose: 1. When fetching less than 1,000,000 rows , it always success. 2. When fetching more than 1,300,000 rows , it always fail with OOM: GC overhead limit exceeded. 3. When fetching about 1,040,000-1,200,000 rows, if query right after the thrift server start up, most times success. if I successfully query once then retry the same query, it will fail. 4. There are 3 dead pattern: OOM:GC overhead limit exceeded or OOM: java heap space or lost worker heartbeat after 120s. 5. I tried to start thrift with different configure, give the worker 4G MEM or 2G MEM , got the same behavior. That means , no matter the total MEM of worker, i can get less than 1,000,000 rows, and can not get more than 1,300,000 rows. Preliminary conclusions: 1. The total data is less than 30MB, It is so small, And there is no complex computation operation. So the failure is not caused by excessive memory requirements. So I guess there are some defect in spark sql code. 2. Allocate 2G or 4G MEM to each worker, got same behavior. This point strengthen my doubts: there are some defect in code. But I can't find the specific location. Thank you so much! Best, Xiaohan Wang
Re: How to use Apache spark mllib Model output in C++ component
Hope Swig http://www.swig.org/index.php and JNA https://github.com/twall/jna/ might help for accessing c++ libraries from Java. Thanks Best Regards On Wed, Jun 10, 2015 at 11:50 AM, mahesht mahesh.s.tup...@gmail.com wrote: There is C++ component which uses some model which we want to replace it by spark model output, but there is no C++ API support for reading model, what is the best way to solve this problem..? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Apache-spark-mllib-Model-output-in-C-component-tp23239.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running SparkSql against Hive tables
On 6/10/15 1:55 AM, James Pirz wrote: I am trying to use Spark 1.3 (Standalone) against Hive 1.2 running on Hadoop 2.6. I looked the ThriftServer2 logs, and I realized that the server was not starting properly, because of failure in creating a server socket. In fact, I had passed the URI to my Hiveserver2 service, launched from Hive, and the beeline in Spark was directly talking to Hive's hiveserver2 and it was just using it as a Hive service. Good to know it's not a bug :) I could fix starting the Thriftserver2 in Spark (by changing port), but I guess the missing puzzle piece for me is: How does Spark SQL re-uses the already created table in Hive ? I mean do I have to write an application that uses HiveContext to do that and submit it to Spark for execution, or is there a way to run SQL scripts directly via command line (in distributed mode and on the cluster) - (Just similar to the way that one would use Hive (or Shark) command line by passing a query file with -f flag). Looking at the Spark SQL documentation, it seems that it is possible. Please correct me if I am wrong. Yes, Spark SQL can access Hive tables by communicating with Hive metastore to retrieve metadata of these tables. After starting HiveThriftServer2, you should be able to use Beeline to run SQL scripts. On Mon, Jun 8, 2015 at 6:56 PM, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: On 6/9/15 8:42 AM, James Pirz wrote: Thanks for the help! I am actually trying Spark SQL to run queries against tables that I've defined in Hive. I follow theses steps: - I start hiveserver2 and in Spark, I start Spark's Thrift server by: $SPARK_HOME/sbin/start-thriftserver.sh --master spark://spark-master-node-ip:7077 - and I start beeline: $SPARK_HOME/bin/beeline - In my beeline session, I connect to my running hiveserver2 !connect jdbc:hive2://hive-node-ip:1 and I can run queries successfully. But based on hiveserver2 logs, It seems it actually uses Hadoop's MR to run queries, *not* Spark's workers. My goals is to access Hive's tables' data, but run queries through Spark SQL using Spark workers (not Hadoop). Hm, interesting. HiveThriftServer2 should never issue MR jobs to perform queries. I did receive two reports in the past which also say MR jobs instead of Spark jobs were issued to perform the SQL query. However, I only reproduced this issue in a rare corner case, which uses HTTP mode to connect to Hive 0.12.0. Apparently this isn't your case. Would you mind to provide more details so that I can dig in? The following information would be very helpful: 1. Hive version 2. A copy of your hive-site.xml 3. Hadoop version 4. Full HiveThriftServer2 log (which can be found in $SPARK_HOME/logs) Thanks in advance! Is it possible to do that via Spark SQL (its CLI) or through its thrift server ? (I tried to find some basic examples in the documentation, but I was not able to) - Any suggestion or hint on how I can do that would be highly appreciated. Thnx On Sun, Jun 7, 2015 at 6:39 AM, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: On 6/6/15 9:06 AM, James Pirz wrote: I am pretty new to Spark, and using Spark 1.3.1, I am trying to use 'Spark SQL' to run some SQL scripts, on the cluster. I realized that for a better performance, it is a good idea to use Parquet files. I have 2 questions regarding that: 1) If I wanna use Spark SQL against *partitioned bucketed* tables with Parquet format in Hive, does the provided spark binary on the apache website support that or do I need to build a new spark binary with some additional flags ? (I found a note https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables in the documentation about enabling Hive support, but I could not fully get it as what the correct way of building is, if I need to build) Yes, Hive support is enabled by default now for the binaries on the website. However, currently Spark SQL doesn't support buckets yet. 2) Does running Spark SQL against tables in Hive downgrade the performance, and it is better that I load parquet files directly to HDFS or having Hive in the picture is harmless ? If you're using Parquet, then it should be fine since by default Spark SQL uses its own native Parquet support to read Parquet Hive tables. Thnx
Re: Determining number of executors within RDD
Hi Akshat, I assume what you want is to make sure the number of partitions in your RDD, which is easily achievable by passing numSlices and minSplits argument at the time of RDD creation. example : val someRDD = sc.parallelize(someCollection, numSlices) / val someRDD = sc.textFile(pathToFile, minSplits) you can check the number of partition your RDD has by 'someRDD.partitions.size'. And if you want to reduce or increase the number of partitions you can call 'repartition(numPartition)' method which which reshuffle the data and partition it in 'numPartition' partitions. And of course if you want you can determine the number of executor as well by setting 'spark.executor.instances' property in 'sparkConf' object. Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Determining-number-of-executors-within-RDD-tp15554p23241.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark-submit does not use hive-site.xml
Hm, this is a common confusion... Although the variable name is `sqlContext` in Spark shell, it's actually a `HiveContext`, which extends `SQLContext` and has the ability to communicate with Hive metastore. So your program need to instantiate a `org.apache.spark.sql.hive.HiveContext` instead. Cheng On 6/10/15 10:19 AM, James Pirz wrote: I am using Spark (standalone) to run queries (from a remote client) against data in tables that are already defined/loaded in Hive. I have started metastore service in Hive successfully, and by putting hive-site.xml, with proper metastore.uri, in $SPARK_HOME/conf directory, I tried to share its config with spark. When I start spark-shell, it gives me a default sqlContext, and I can use that to access my Hive's tables with no problem. But once I submit a similar query via Spark application through 'spark-submit', it does not see the tables and it seems it does not pick hive-site.xml which is under conf directory in Spark's home. I tried to use '--files' argument with spark-submit to pass hive-site.xml' to the workers, but it did not change anything. Here is how I try to run the application: $SPARK_HOME/bin/spark-submit --class SimpleClient --master spark://my-spark-master:7077 --files=$SPARK_HOME/conf/hive-site.xml simple-sql-client-1.0.jar Here is the simple example code that I try to run (in Java): SparkConf conf = new SparkConf().setAppName(Simple SQL Client); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); DataFrame res = sqlContext.sql(show tables); res.show(); Here are the SW versions: Spark: 1.3 Hive: 1.2 Hadoop: 2.6 Thanks in advance for any suggestion.
回复:Re: Re: Re: How to decrease the time of storing block in memory
thanks Ak, thanks for your idea. I had tried using spark to do what the shell did. However it is not fast enough as I expected and not very easy. Thanksamp;Best regards! San.Luo - 原始邮件 - 发件人:Akhil Das ak...@sigmoidanalytics.com 收件人:罗辉 luohui20...@sina.com 抄送人:user user@spark.apache.org 主题:Re: Re: Re: How to decrease the time of storing block in memory 日期:2015年06月09日 18点05分 Hi 罗辉 I think you interpret the logs wrong. Your program actually runs from this point: (Rest of them are just starting up stuffs and connecting) 15/06/08 16:14:22 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 015/06/08 16:14:23 INFO storage.MemoryStore: ensureFreeSpace(1561) called with curMem=0, maxMem=37050384315/06/08 16:14:23 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1561.0 B, free 353.3 MB)15/06/08 16:14:23 INFO storage.BlockManagerMaster: Updated info of block broadcast_0_piece015/06/08 16:14:23 INFO broadcast.TorrentBroadcast: Reading broadcast variable 0 took 967 ms15/06/08 16:14:23 INFO storage.MemoryStore: ensureFreeSpace(2168) called with curMem=1561, maxMem=37050384315/06/08 16:14:23 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.1 KB, free 353.3 MB) = At this point it has already stored the broadcast piece in memory. And starts your Task 0 15/06/08 16:14:42 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 693 bytes result sent to driver = It took 19s to finish your Task 0, and starts Task 1 from this point 15/06/08 16:14:42 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 115/06/08 16:14:42 INFO executor.Executor: Running task 1.0 in stage 0.0 (TID 1)15/06/08 16:14:56 INFO executor.Executor: Finished task 1.0 in stage 0.0 (TID 1). 693 bytes result sent to driver15/06/08 16:14:56 INFO executor.CoarseGrainedExecutorBackend: Driver commanded a shutdown Now, to speed up things you need to obtain parallelism (at least 2-3 times the number of cores you have), which could mean that your sort.sh is running on a single core. You can perhaps instead of triggering an external command try to do the operation within spark itself, in that way you can always control the parallelism and stuffs. Hope it helps. ThanksBest Regards On Tue, Jun 9, 2015 at 3:00 PM, luohui20...@sina.com wrote: hi akhil Not exactly ,the task took 54s to finish, started from 16:14:02 and ended at 16:14:56. within this 54s , it needs 19s to store value in memory, which started from 16:14:23 and ended at 16:14:42. I think this is the most time-wasting part of this task ,also unreasonable.You may check the log attached in previous mail. and here is my codes: import org.apache.spark._ object GeneCompare3 { def main(args: Array[String]) { //i:piece number, j:user number val i = args(0).toInt val j = args(1) val conf = new SparkConf().setAppName(CompareGenePiece + i + of User + j).setMaster(spark://slave3:7077).set(spark.executor.memory, 2g) val sc = new SparkContext(conf) println(start to compare gene) val runmodifyshell2 = List(run, sort.sh) val runmodifyshellRDD2 = sc.makeRDD(runmodifyshell2) val pipeModify2 = runmodifyshellRDD2.pipe(sh /opt/sh/bin/sort.sh /opt/data/shellcompare/db/chr + i + .txt /opt/data/shellcompare/data/user + j + /pgs/sample/samplechr + i + .txt /opt/data/shellcompare/data/user + j + /pgs/intermediateResult/result + i + .txt 600) pipeModify2.collect()sc.stop() } } Thanksamp;Best regards! San.Luo - 原始邮件 - 发件人:Akhil Das ak...@sigmoidanalytics.com 收件人:罗辉 luohui20...@sina.com 抄送人:user user@spark.apache.org 主题:Re: Re: How to decrease the time of storing block in memory 日期:2015年06月09日 16点51分 Is it that task taking 19s? It won't be simply taking 19s to store 2KB of data into memory there could be other operations happening too (the transformations that you are doing), It would be good if you can paste the code snippet that you are running to have a better understanding.ThanksBest Regards On Tue, Jun 9, 2015 at 2:09 PM, luohui20...@sina.com wrote: Only 1 minor GC, 0.07s. Thanksamp;Best regards! San.Luo - 原始邮件 - 发件人:Akhil Das ak...@sigmoidanalytics.com 收件人:罗辉 luohui20...@sina.com 抄送人:user user@spark.apache.org 主题:Re: How to decrease the time of storing block in memory 日期:2015年06月09日 15点02分 May be you should check in your driver UI and see if there's any GC time involved etc. ThanksBest Regards On Mon, Jun 8, 2015 at 5:45 PM, luohui20...@sina.com wrote: hi there I am trying to descrease my app's running time in worker node. I checked the log and found the most time-wasting part is below:15/06/08 16:14:23 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.1 KB, free 353.3 MB) 15/06/08 16:14:42 INFO executor.Executor:
Re: Spark SQL with Thrift Server is very very slow and finally failing
Would you mind to provide executor output so that we can check the reason why executors died? And you may run EXPLAIN EXTENDED to find out the physical plan of your query, something like: |0: jdbc:hive2://localhost:1 explain extended select * from foo; +-+ | plan | +-+ | == Parsed Logical Plan == | | 'Project [*]| | 'UnresolvedRelation [foo], None| | | | == Analyzed Logical Plan == | | i: string | | Project [i#6] | | Subquery foo | | Relation[i#6] org.apache.spark.sql.parquet.ParquetRelation2@517574b8 | | | | == Optimized Logical Plan ==| | Relation[i#6] org.apache.spark.sql.parquet.ParquetRelation2@517574b8| | | | == Physical Plan == | | PhysicalRDD [i#6], MapPartitionsRDD[2] at | | | | Code Generation: false | | == RDD == | +-+ | On 6/10/15 1:28 PM, Sourav Mazumder wrote: From log file I noticed that the ExecutorLostFailure happens after the memory used by Executor becomes more than the Executor memory value. However, even if I increase the value of Executor Memory the Executor fails - only that it takes longer time. I'm wondering that for joining 2 Hive tables, one with 100 MB data (around 1 M rows) and another with 20 KB data (around 100 rows) why an executor is consuming so much of memory. Even if I increase the memory to 20 GB. The same failure happens. Regards, Sourav On Tue, Jun 9, 2015 at 12:58 PM, Sourav Mazumder sourav.mazumde...@gmail.com mailto:sourav.mazumde...@gmail.com wrote: Hi, I'm just doing a select statement which is supposed to return 10 MB data maximum. The driver memory is 2G and executor memory is 20 G. The query I'm trying to run is something like SELECT PROJECT_LIVE_DT, FLOORPLAN_NM, FLOORPLAN_DB_KEY FROM POG_PRE_EXT P, PROJECT_CALENDAR_EXT C WHERE PROJECT_TYPE = 'CR' Not sure what exactly you mean by physical plan. Here is he stack trace from the machine where the thrift process is running. Regards, Sourav On Mon, Jun 8, 2015 at 11:18 PM, Cheng, Hao hao.ch...@intel.com mailto:hao.ch...@intel.com wrote: Is it the large result set return from the Thrift Server? And can you paste the SQL and physical plan? *From:*Ted Yu [mailto:yuzhih...@gmail.com mailto:yuzhih...@gmail.com] *Sent:* Tuesday, June 9, 2015 12:01 PM *To:* Sourav Mazumder *Cc:* user *Subject:* Re: Spark SQL with Thrift Server is very very slow and finally failing Which Spark release are you using ? Can you pastebin the stack trace w.r.t. ExecutorLostFailure ? Thanks On Mon, Jun 8, 2015 at 8:52 PM, Sourav Mazumder sourav.mazumde...@gmail.com mailto:sourav.mazumde...@gmail.com wrote: Hi, I am trying to run a SQL form a JDBC driver using Spark's Thrift Server. I'm doing a join between a Hive Table of size around 100 GB and another Hive Table with 10 KB, with a filter on a particular column The query takes more than 45 minutes and then I get ExecutorLostFailure. That is because of memory as once I increase the memory the failure happens but after a long time. I'm having executor memory 20 GB, Spark DRiver Memory 2 GB, Executor Instances 2 and Executor Core 2. Running the job using Yarn with master as 'yarn-client'. Any idea if I'm missing any other configuration ? Regards, Sourav
Re: Met OOM when fetching more than 1,000,000 rows.
Hi Xiaohan, Would you please try to set spark.sql.thriftServer.incrementalCollect to true and increasing driver memory size? In this way, HiveThriftServer2 uses RDD.toLocalIterator rather than RDD.collect().iterator to return the result set. The key difference is that RDD.toLocalIterator retrieves a single partition at a time, thus avoid holding the whole result set on driver side. The memory issue happens on driver side rather than executor side, so tuning executor memory size doesn't help. Cheng On 6/10/15 3:46 PM, Hester wang wrote: Hi Lian, I met a SparkSQL problem. I really appreciate it if you could give me some help! Below is the detailed description of the problem, for more information, attached are the original code and the log that you may need. Problem: I want to query my table which stored in Hive through the SparkSQL JDBC interface. And want to fetch more than 1,000,000 rows. But met OOM. sql = select * from TEMP_ADMIN_150601_01 limit XXX ; My Env: 5 Nodes = One master + 4 workers, 1000M Network Switch , Redhat 6.5 Each node: 8G RAM, 500G Harddisk Java 1.6, Scala 2.10.4, Hadoop 2.6, Spark 1.3.0, Hive 0.13 Data: A table with user and there charge for electricity data. About 1,600,000 Rows. About 28MB. Each row occupy about 18 Bytes. 2 columns: user_id String, total_num Double Repro Steps: 1. Start Spark 2. Start SparkSQL thriftserver, command: /usr/local/spark/spark-1.3.0/sbin/start-thriftserver.sh --master spark://cx-spark-001:7077 --conf spark.executor.memory=4g --conf spark.driver.memory=2g --conf spark.shuffle.consolidateFiles=true --conf spark.shuffle.manager=sort --conf spark.executor.extraJavaOptions=-XX:-UseGCOverheadLimit --conf spark.file.transferTo=false --conf spark.akka.timeout=2000 --conf spark.storage.memoryFraction=0.4 --conf spark.cores.max=8 --conf spark.kryoserializer.buffer.mb=256 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.akka.frameSize=512 --driver-class-path /usr/local/hive/lib/classes12.jar 3. Run the test code, see it in attached file: testHiveJDBC.java 4. Get the OOM:GC overhead limit exceeded or OOM: java heap space or lost worker heartbeat after 120s. see the attached logs. Preliminary diagnose: 1. When fetching less than 1,000,000 rows , it always success. 2. When fetching more than 1,300,000 rows , it always fail with OOM: GC overhead limit exceeded. 3. When fetching about 1,040,000-1,200,000 rows, if query right after the thrift server start up, most times success. if I successfully query once then retry the same query, it will fail. 4. There are 3 dead pattern: OOM:GC overhead limit exceeded or OOM: java heap space or lost worker heartbeat after 120s. 5. I tried to start thrift with different configure, give the worker 4G MEM or 2G MEM , got the same behavior. That means , no matter the total MEM of worker, i can get less than 1,000,000 rows, and can not get more than 1,300,000 rows. Preliminary conclusions: 1. The total data is less than 30MB, It is so small, And there is no complex computation operation. So the failure is not caused by excessive memory requirements. So I guess there are some defect in spark sql code. 2. Allocate 2G or 4G MEM to each worker, got same behavior. This point strengthen my doubts: there are some defect in code. But I can't find the specific location. Thank you so much! Best, Xiaohan Wang
Re: Monitoring Spark Jobs
Hi Sam, You might want to have a look at spark UI which runs by default at localhost://8080. You can also configure Apache Ganglia to monitor over your cluster resources. Thank you Regards Himanshu Mehra -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Monitoring-Spark-Jobs-tp23193p23243.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
Would you please also provide executor stdout and stderr output? Thanks. Cheng On 6/10/15 4:23 PM, 姜超才 wrote: Hi Lian, Thanks for your quick response. I forgot mention that I have tuned driver memory from 2G to 4G, seems got minor improvement, The dead way when fetching 1,400,000 rows changed from OOM::GC overhead limit exceeded to lost worker heartbeat after 120s. I will try to set spark.sql.thriftServer.incrementalCollect and continue increase driver memory to 7G, and will send the result to you. Thanks, SuperJ - 原始邮件信息 - *发件人:* Cheng Lian l...@databricks.com *收件人:* Hester wang hester9...@gmail.com, user@spark.apache.org *主题:* Re: Met OOM when fetching more than 1,000,000 rows. *日期:* 2015/06/10 16:15:47 (Wed) Hi Xiaohan, Would you please try to set spark.sql.thriftServer.incrementalCollect to true and increasing driver memory size? In this way, HiveThriftServer2 uses RDD.toLocalIterator rather than RDD.collect().iterator to return the result set. The key difference is that RDD.toLocalIterator retrieves a single partition at a time, thus avoid holding the whole result set on driver side. The memory issue happens on driver side rather than executor side, so tuning executor memory size doesn't help. Cheng On 6/10/15 3:46 PM, Hester wang wrote: Hi Lian, I met a SparkSQL problem. I really appreciate it if you could give me some help! Below is the detailed description of the problem, for more information, attached are the original code and the log that you may need. Problem: I want to query my table which stored in Hive through the SparkSQL JDBC interface. And want to fetch more than 1,000,000 rows. But met OOM. sql = select * from TEMP_ADMIN_150601_01 limit XXX ; My Env: 5 Nodes = One master + 4 workers, 1000M Network Switch , Redhat 6.5 Each node: 8G RAM, 500G Harddisk Java 1.6, Scala 2.10.4, Hadoop 2.6, Spark 1.3.0, Hive 0.13 Data: A table with user and there charge for electricity data. About 1,600,000 Rows. About 28MB. Each row occupy about 18 Bytes. 2 columns: user_id String, total_num Double Repro Steps: 1. Start Spark 2. Start SparkSQL thriftserver, command: /usr/local/spark/spark-1.3.0/sbin/start-thriftserver.sh --master spark://cx-spark-001:7077 --conf spark.executor.memory=4g --conf spark.driver.memory=2g --conf spark.shuffle.consolidateFiles=true --conf spark.shuffle.manager=sort --conf spark.executor.extraJavaOptions=-XX:-UseGCOverheadLimit --conf spark.file.transferTo=false --conf spark.akka.timeout=2000 --conf spark.storage.memoryFraction=0.4 --conf spark.cores.max=8 --conf spark.kryoserializer.buffer.mb=256 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.akka.frameSize=512 --driver-class-path /usr/local/hive/lib/classes12.jar 3. Run the test code, see it in attached file: testHiveJDBC.java 4. Get the OOM:GC overhead limit exceeded or OOM: java heap space or lost worker heartbeat after 120s. see the attached logs. Preliminary diagnose: 1. When fetching less than 1,000,000 rows , it always success. 2. When fetching more than 1,300,000 rows , it always fail with OOM: GC overhead limit exceeded. 3. When fetching about 1,040,000-1,200,000 rows, if query right after the thrift server start up, most times success. if I successfully query once then retry the same query, it will fail. 4. There are 3 dead pattern: OOM:GC overhead limit exceeded or OOM: java heap space or lost worker heartbeat after 120s. 5. I tried to start thrift with different configure, give the worker 4G MEM or 2G MEM , got the same behavior. That means , no matter the total MEM of worker, i can get less than 1,000,000 rows, and can not get more than 1,300,000 rows. Preliminary conclusions: 1. The total data is less than 30MB, It is so small, And there is no complex computation operation. So the failure is not caused by excessive memory requirements. So I guess there are some defect in spark sql code. 2. Allocate 2G or 4G MEM to each worker, got same behavior. This point strengthen my doubts: there are some defect in code. But I can't find the specific location. Thank you so much! Best, Xiaohan Wang
Re: PostgreSQL JDBC Classpath Issue
Michael had answered this question in the SO thread http://stackoverflow.com/a/30226336 Cheng On 6/10/15 9:24 PM, shahab wrote: Hi George, I have same issue, did you manage to find a solution? best, /Shahab On Wed, May 13, 2015 at 9:21 PM, George Adams g.w.adams...@gmail.com mailto:g.w.adams...@gmail.com wrote: Hey all, I seem to be having an issue with PostgreSQL JDBC jar on my classpath. I’ve outlined the issue on Stack Overflow (http://stackoverflow.com/questions/30221677/spark-sql-postgresql-data-source-issues). I’m not sure how to fix this since I built the uber jar using sbt-assembly and the final jar does have org/postgresql/Driver.class. — George Adams, IV Software Craftsman Brand Networks, Inc. (585) 902-8822
Re: append file on hdfs
Hi, if you now want to write 1 file per partition, that's actually built into Spark as *saveAsTextFile*(*path*)Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file. On Wed, Jun 10, 2015 at 4:44 AM, Pa Rö paul.roewer1...@googlemail.com wrote: hi, i have an idea to solve my problem, i want write one file for each spark partion, but i not know to get the actuel partion suffix/ID in my call function? points.foreachPartition( new VoidFunctionIteratorTuple2Integer, GeoTimeDataTupel() { private static final long serialVersionUID = -7210897529331503565L; public void call(IteratorTuple2Integer, GeoTimeDataTupel entry)throws Exception { while(entry.hasNext()) { Tuple2Integer, GeoTimeDataTupel temp = entry.next(); try { FileSystem fs = FileSystem.get(new URI(pro.getProperty(hdfs.namenode)),new Configuration()); Path pt=new Path(fs.getHomeDirectory()+pro.getProperty(spark.output)+/results); } catch(Exception e) { e.printStackTrace(); } } } } ); 2015-06-09 15:34 GMT+02:00 Pa Rö paul.roewer1...@googlemail.com: hi community, i want append results to one file. if i work local my function build all right, if i run this on a yarn cluster, i lost same rows. here my function to write: points.foreach( new VoidFunctionTuple2Integer, GeoTimeDataTupel() { private static final long serialVersionUID = 2459995649387229261L; public void call(Tuple2Integer, GeoTimeDataTupel entry)throws Exception { try { FileSystem fs = FileSystem.get(new URI(pro.getProperty(hdfs.namenode)),new Configuration()); Path pt=new Path(fs.getHomeDirectory()+pro.getProperty(spark.output)+/results); if(fs.exists(pt)) { FSDataInputStream in = fs.open(pt); Path pt_temp = new Path(fs.getHomeDirectory()+pro.getProperty(spark.output)+/results_temp); backup(fs.getConf(), fs, in, pt_temp); in.close(); FSDataOutputStream out = fs.create((pt), true); FSDataInputStream backup = fs.open(pt_temp); int offset = 0; int bufferSize = 4096; int result = 0; byte[] buffer = new byte[bufferSize]; // pre read a part of content from input stream result = backup.read(offset, buffer, 0, bufferSize); // loop read input stream until it does not fill whole size of buffer while (result == bufferSize) { out.write(buffer); // read next segment from input stream by moving the offset pointer offset += bufferSize; result = backup.read(offset, buffer, 0, bufferSize); } if (result 0 result bufferSize) { for (int i = 0; i result; i++) { out.write(buffer[i]); } } out.writeBytes(Cluster: +entry._1+, Point: +entry._2.toString()+\n); out.close(); } else { BufferedWriter bw =new BufferedWriter(new OutputStreamWriter(fs.create(pt))); bw.write(Cluster: +entry._1+, Point: +entry._2.toString()+\n); bw.close(); } } catch (Exception e) { e.printStackTrace(); } } public void backup(Configuration conf, FileSystem fs,FSDataInputStream sourceContent, Path pt_temp) throws Exception { FSDataOutputStream out = fs.create(pt_temp, true); IOUtils.copyBytes(sourceContent, out, 4096, false); out.close(); } where is my fault?? or give it a function to write(append) to the hadoop hdfs? best regards, paul
Fully in-memory shuffles
Is it possible to configure Spark to do all of its shuffling FULLY in memory (given that I have enough memory to store all the data)?
Re: Determining number of executors within RDD
Note that this property is only available for YARN -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Determining-number-of-executors-within-RDD-tp15554p23256.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Fully in-memory shuffles
There's a discussion of this at https://github.com/apache/spark/pull/5403 On Wed, Jun 10, 2015 at 7:08 AM, Corey Nolet cjno...@gmail.com wrote: Is it possible to configure Spark to do all of its shuffling FULLY in memory (given that I have enough memory to store all the data)?
Re: Cassandra Submit
Do you build via maven or sbt? How do you submit your application -- do you use local, standalone or mesos/yarn? Your jars as you originally listed them seem right to me. Try this, from your ${SPARK_HOME}: SPARK_CLASSPATH=spark-cassandra-connector_2.10-1.3.0-M1.jar:guava-jdk5-14.0.1.jar:cassandra-driver-core-2.1.5.jar:cassandra-thrift-2.1.3.jar:joda-time-2.3.jar bin/spark-shell --conf spark.cassandra.connection.host=127.0.0.1 where you'd have to provide the correct paths to the jars you're using. This will drop you in a spark-shell import com.datastax.spark.connector._ val test = sc.cassandraTable(your_keyspace,your_columnfamily) test.first I would first try to get this running in local mode, and if all works well start looking at the jar you're distributing via spark-submit and the classpaths of your executors (this collection of jars does work for me by the way, so the show cassandra jars definitely work well with Spark 1.3.1). On Wed, Jun 10, 2015 at 2:53 AM, Yasemin Kaya godo...@gmail.com wrote: It is really hell. How can I know which jars match? Which version of assembly fits me? 2015-06-10 0:59 GMT+03:00 Mohammed Guller moham...@glassbeam.com: Looks like the real culprit is a library version mismatch: Caused by: java.lang.NoSuchMethodError: org.apache.cassandra.thrift.TFramedTransportFactory.openTransport(Ljava/lang/String;I)Lorg/apache/thrift/transport/TTransport; at com.datastax.spark.connector.cql.DefaultConnectionFactory$.createThriftClient(CassandraConnectionFactory.scala:41) at com.datastax.spark.connector.cql.CassandraConnector.createThriftClient(CassandraConnector.scala:134) ... 28 more The Spark Cassandra Connector is trying to use a method, which does not exists. That means your assembly jar has the wrong version of the library that SCC is trying to use. Welcome to jar hell! Mohammed *From:* Yasemin Kaya [mailto:godo...@gmail.com] *Sent:* Tuesday, June 9, 2015 12:24 PM *To:* Mohammed Guller *Cc:* Yana Kadiyska; Gerard Maas; user@spark.apache.org *Subject:* Re: Cassandra Submit My code https://gist.github.com/yaseminn/d77dd9baa6c3c43c7594 and exception https://gist.github.com/yaseminn/fdd6e5a6efa26219b4d3. and ~/cassandra/apache-cassandra-2.1.5$ *bin/cqlsh* Connected to Test Cluster at 127.0.0.1:9042. [cqlsh 5.0.1 | Cassandra 2.1.5 | CQL spec 3.2.0 | Native protocol v3] Use HELP for help. cqlsh use test; cqlsh:test select * from people; * id | name* *+-* * 5 | eslem* * 1 | yasemin* * 8 | ali* * 2 | busra* * 4 | ilham* * 7 | kubra* * 6 |tuba* * 9 |aslı* * 3 | Andrew* (9 rows) cqlsh:test *bin/cassandra-cli -h 127.0.0.1 -p 9160* Connected to: Test Cluster on 127.0.0.1/9160 Welcome to Cassandra CLI version 2.1.5 The CLI is deprecated and will be removed in Cassandra 3.0. Consider migrating to cqlsh. CQL is fully backwards compatible with Thrift data; see http://www.datastax.com/dev/blog/thrift-to-cql3 Type 'help;' or '?' for help. Type 'quit;' or 'exit;' to quit. [default@unknown] yasemin 2015-06-09 22:03 GMT+03:00 Mohammed Guller moham...@glassbeam.com: It is strange that writes works but read does not. If it was a Cassandra connectivity issue, then neither write or read would work. Perhaps the problem is somewhere else. Can you send the complete exception trace? Also, just to make sure that there is no DNS issue, try this: ~/cassandra/apache-cassandra-2.1.5$ bin/cassandra-cli -h 127.0.0.1 -p 9160 Mohammed *From:* Yasemin Kaya [mailto:godo...@gmail.com] *Sent:* Tuesday, June 9, 2015 11:32 AM *To:* Yana Kadiyska *Cc:* Gerard Maas; Mohammed Guller; user@spark.apache.org *Subject:* Re: Cassandra Submit I removed core and streaming jar. And the exception still same. I tried what you said then results: ~/cassandra/apache-cassandra-2.1.5$ bin/cassandra-cli -h localhost -p 9160 Connected to: Test Cluster on localhost/9160 Welcome to Cassandra CLI version 2.1.5 The CLI is deprecated and will be removed in Cassandra 3.0. Consider migrating to cqlsh. CQL is fully backwards compatible with Thrift data; see http://www.datastax.com/dev/blog/thrift-to-cql3 Type 'help;' or '?' for help. Type 'quit;' or 'exit;' to quit. [default@unknown] and ~/cassandra/apache-cassandra-2.1.5$ bin/cqlsh Connected to Test Cluster at 127.0.0.1:9042. [cqlsh 5.0.1 | Cassandra 2.1.5 | CQL spec 3.2.0 | Native protocol v3] Use HELP for help. cqlsh Thank you for your kind responses ... 2015-06-09 20:59 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com: Hm, jars look ok, although it's a bit of a mess -- you have spark-assembly 1.3.0 but then core and streaming 1.3.1...It's generally a bad idea to mix versions. Spark-assembly bundless all spark packages, so either do them separately or use spark-assembly but don't mix like you've shown.
Re: which database for gene alignment data ?
Hi Roni, These are exposed as public APIs. If you want, you can run them inside of the adam-shell (which is just a wrapper for the spark shell, but with the ADAM libraries on the class path). Also , I need to save all my intermediate data. Seems like ADAM stores data in Parquet on HDFS. I want to save something in an external database, so that we can re-use the saved data in multiple ways by multiple people. The Parquet data can be accessed via Hive, Spark SQL, Impala, etc. Additionally, from ADAM, you can export most data out to legacy genomics formats. I’m not sure though if we support that right now for feature data; those are fairly new. Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Jun 9, 2015, at 9:21 PM, roni roni.epi...@gmail.com wrote: Hi Frank, Thanks for the reply. I downloaded ADAM and built it but it does not seem to list this function for command line options. Are these exposed as public API and I can call it from code ? Also , I need to save all my intermediate data. Seems like ADAM stores data in Parquet on HDFS. I want to save something in an external database, so that we can re-use the saved data in multiple ways by multiple people. Any suggestions on the DB selection or keeping data centralized for use by multiple distinct groups? Thanks -Roni On Mon, Jun 8, 2015 at 12:47 PM, Frank Austin Nothaft fnoth...@berkeley.edu wrote: Hi Roni, We have a full suite of genomic feature parsers that can read BED, narrowPeak, GATK interval lists, and GTF/GFF into Spark RDDs in ADAM Additionally, we have support for efficient overlap joins (query 3 in your email below). You can load the genomic features with ADAMContext.loadFeatures. We have two tools for the overlap computation: you can use a BroadcastRegionJoin if one of the datasets you want to overlap is small or a ShuffleRegionJoin if both datasets are large. Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Jun 8, 2015, at 9:39 PM, roni roni.epi...@gmail.com wrote: Sorry for the delay. The files (called .bed files) have format like - Chromosome start endfeature score strand chr1 713776 714375 peak.1 599+ chr1 752401 753000 peak.2 599+ The mandatory fields are chrom - The name of the chromosome (e.g. chr3, chrY, chr2_random) or scaffold (e.g. scaffold10671). chromStart - The starting position of the feature in the chromosome or scaffold. The first base in a chromosome is numbered 0. chromEnd - The ending position of the feature in the chromosome or scaffold. The chromEnd base is not included in the display of the feature. For example, the first 100 bases of a chromosome are defined as chromStart=0, chromEnd=100, and span the bases numbered 0-99. There can be more data as described - https://genome.ucsc.edu/FAQ/FAQformat.html#format1 Many times the use cases are like 1. find the features between given start and end positions 2.Find features which have overlapping start and end points with another feature. 3. read external (reference) data which will have similar format (chr10 4851478549604641MAPK8 49514785+) and find all the data points which are overlapping with the other .bed files. The data is huge. .bed files can range from .5 GB to 5 gb (or more) I was thinking of using cassandra, but not sue if the overlapping queries can be supported and will be fast enough. Thanks for the help -Roni On Sat, Jun 6, 2015 at 7:03 AM, Ted Yu yuzhih...@gmail.com wrote: Can you describe your use case in a bit more detail since not all people on this mailing list are familiar with gene sequencing alignments data ? Thanks On Fri, Jun 5, 2015 at 11:42 PM, roni roni.epi...@gmail.com wrote: I want to use spark for reading compressed .bed file for reading gene sequencing alignments data. I want to store bed file data in db and then use external gene expression data to find overlaps etc, which database is best for it ? Thanks -Roni
Re: Spark SQL with Thrift Server is very very slow and finally failing
Here is the physical plan. Also attaching the executor log from one of the executors. You can see that memory consumption is slowly rising and then it is reaching around 10.5 GB. There it is staying for around 5 minutes 06-50-36 to 06-55-00. Then this executor is getting killed. ExecutorMemory configured is 10GB. Regards, Sourav --- plan -- == Parsed Logical Plan == 'Project ['IKB_PROJECT_LIVE_DT,'FLOORPLAN_NM,'FLOORPLAN_DBKEY] 'Filter ('IKB_PROJECT_TYPE = CR) 'Join Inner, None 'UnresolvedRelation [IKB_FP_POG_PRE_EXT], Some(P) 'UnresolvedRelation [IKB_PROJECT_CALENDAR_EXT], Some(C) == Analyzed Logical Plan == Project [IKB_PROJECT_LIVE_DT#31,FLOORPLAN_NM#20,FLOORPLAN_DBKEY#17] Filter (IKB_PROJECT_TYPE#29 = CR) Join Inner, None MetastoreRelation sourav_ikb_hs, ikb_fp_pog_pre_ext, Some(P) MetastoreRelation sourav_ikb_hs, ikb_project_calendar_ext, Some(C) == Optimized Logical Plan == Project [IKB_PROJECT_LIVE_DT#31,FLOORPLAN_NM#20,FLOORPLAN_DBKEY#17] Join Inner, None Project [FLOORPLAN_NM#20,FLOORPLAN_DBKEY#17] MetastoreRelation sourav_ikb_hs, ikb_fp_pog_pre_ext, Some(P) Project [IKB_PROJECT_LIVE_DT#31] Filter (IKB_PROJECT_TYPE#29 = CR) MetastoreRelation sourav_ikb_hs, ikb_project_calendar_ext, Some(C) == Physical Plan == Project [IKB_PROJECT_LIVE_DT#31,FLOORPLAN_NM#20,FLOORPLAN_DBKEY#17] CartesianProduct HiveTableScan [FLOORPLAN_NM#20,FLOORPLAN_DBKEY#17], (MetastoreRelation sourav_ikb_hs, ikb_fp_pog_pre_ext, Some(P)), None Project [IKB_PROJECT_LIVE_DT#31] Filter (IKB_PROJECT_TYPE#29 = CR) HiveTableScan [IKB_PROJECT_LIVE_DT#31,IKB_PROJECT_TYPE#29], (MetastoreRelation sourav_ikb_hs, ikb_project_calendar_ext, Some(C)), None Code Generation: false == RDD == --- On Wed, Jun 10, 2015 at 12:59 AM, Cheng Lian lian.cs@gmail.com wrote: Would you mind to provide executor output so that we can check the reason why executors died? And you may run EXPLAIN EXTENDED to find out the physical plan of your query, something like: 0: jdbc:hive2://localhost:1 explain extended select * from foo; +-+ | plan | +-+ | == Parsed Logical Plan == | | 'Project [*]| | 'UnresolvedRelation [foo], None| | | | == Analyzed Logical Plan == | | i: string | | Project [i#6] | | Subquery foo | | Relation[i#6] org.apache.spark.sql.parquet.ParquetRelation2@517574b8 | | | | == Optimized Logical Plan ==| | Relation[i#6] org.apache.spark.sql.parquet.ParquetRelation2@517574b8| | | | == Physical Plan == | | PhysicalRDD [i#6], MapPartitionsRDD[2] at | | | | Code Generation: false | | == RDD == | +-+ On 6/10/15 1:28 PM, Sourav Mazumder wrote: From log file I noticed that the ExecutorLostFailure happens after the memory used by Executor becomes more than the Executor memory value. However, even if I increase the value of Executor Memory the Executor fails - only that it takes longer time. I'm wondering that for joining 2 Hive tables, one with 100 MB data (around 1 M rows) and another with 20 KB data (around 100 rows) why an executor is consuming so much of memory. Even if I increase the memory to 20 GB. The same failure happens. Regards, Sourav On Tue, Jun 9, 2015 at 12:58 PM, Sourav Mazumder sourav.mazumde...@gmail.com wrote: Hi, I'm just doing a select statement which is supposed to return 10 MB data maximum. The driver memory is 2G and executor memory is 20 G. The query I'm trying to run is something like SELECT PROJECT_LIVE_DT, FLOORPLAN_NM, FLOORPLAN_DB_KEY FROM POG_PRE_EXT P, PROJECT_CALENDAR_EXT C WHERE PROJECT_TYPE = 'CR' Not sure what exactly you mean by physical plan.
Re: spark uses too much memory maybe (binaryFiles() with more than 1 million files in HDFS), groupBy or reduceByKey()
I am profiling the driver. It currently has 564MB of strings which might be the 1mil file names. But also it has 2.34 GB of long[] ! That's so far, it is still running. What are those long[] used for? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-uses-too-much-memory-maybe-binaryFiles-with-more-than-1-million-files-in-HDFS-groupBy-or-reduc-tp23253p23257.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL with Thrift Server is very very slow and finally failing
Seems that Spark SQL can't retrieve table size statistics and doesn't enable broadcast join in your case. Would you please try `ANALYZE TABLE table-name` for both tables to generated table statistics information? Cheng On 6/10/15 10:26 PM, Sourav Mazumder wrote: Here is the physical plan. Also attaching the executor log from one of the executors. You can see that memory consumption is slowly rising and then it is reaching around 10.5 GB. There it is staying for around 5 minutes 06-50-36 to 06-55-00. Then this executor is getting killed. ExecutorMemory configured is 10GB. Regards, Sourav --- plan -- == Parsed Logical Plan == 'Project ['IKB_PROJECT_LIVE_DT,'FLOORPLAN_NM,'FLOORPLAN_DBKEY] 'Filter ('IKB_PROJECT_TYPE = CR) 'Join Inner, None 'UnresolvedRelation [IKB_FP_POG_PRE_EXT], Some(P) 'UnresolvedRelation [IKB_PROJECT_CALENDAR_EXT], Some(C) == Analyzed Logical Plan == Project [IKB_PROJECT_LIVE_DT#31,FLOORPLAN_NM#20,FLOORPLAN_DBKEY#17] Filter (IKB_PROJECT_TYPE#29 = CR) Join Inner, None MetastoreRelation sourav_ikb_hs, ikb_fp_pog_pre_ext, Some(P) MetastoreRelation sourav_ikb_hs, ikb_project_calendar_ext, Some(C) == Optimized Logical Plan == Project [IKB_PROJECT_LIVE_DT#31,FLOORPLAN_NM#20,FLOORPLAN_DBKEY#17] Join Inner, None Project [FLOORPLAN_NM#20,FLOORPLAN_DBKEY#17] MetastoreRelation sourav_ikb_hs, ikb_fp_pog_pre_ext, Some(P) Project [IKB_PROJECT_LIVE_DT#31] Filter (IKB_PROJECT_TYPE#29 = CR) MetastoreRelation sourav_ikb_hs, ikb_project_calendar_ext, Some(C) == Physical Plan == Project [IKB_PROJECT_LIVE_DT#31,FLOORPLAN_NM#20,FLOORPLAN_DBKEY#17] CartesianProduct HiveTableScan [FLOORPLAN_NM#20,FLOORPLAN_DBKEY#17], (MetastoreRelation sourav_ikb_hs, ikb_fp_pog_pre_ext, Some(P)), None Project [IKB_PROJECT_LIVE_DT#31] Filter (IKB_PROJECT_TYPE#29 = CR) HiveTableScan [IKB_PROJECT_LIVE_DT#31,IKB_PROJECT_TYPE#29], (MetastoreRelation sourav_ikb_hs, ikb_project_calendar_ext, Some(C)), None Code Generation: false == RDD == --- On Wed, Jun 10, 2015 at 12:59 AM, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: Would you mind to provide executor output so that we can check the reason why executors died? And you may run EXPLAIN EXTENDED to find out the physical plan of your query, something like: |0: jdbc:hive2://localhost:1 explain extended select * from foo; +-+ | plan | +-+ | == Parsed Logical Plan == | | 'Project [*]| | 'UnresolvedRelation [foo], None| | | | == Analyzed Logical Plan == | | i: string | | Project [i#6] | | Subquery foo | | Relation[i#6] org.apache.spark.sql.parquet.ParquetRelation2@517574b8 | | | | == Optimized Logical Plan ==| | Relation[i#6] org.apache.spark.sql.parquet.ParquetRelation2@517574b8| | | | == Physical Plan == | | PhysicalRDD [i#6], MapPartitionsRDD[2] at | | | | Code Generation: false | | == RDD == | +-+ | On 6/10/15 1:28 PM, Sourav Mazumder wrote: From log file I noticed that the ExecutorLostFailure happens after the memory used by Executor becomes more than the Executor memory value. However, even if I increase the value of Executor Memory the Executor fails - only that it takes longer time. I'm wondering that for joining 2 Hive tables, one with 100 MB data (around 1 M rows) and another with 20 KB data (around 100 rows) why an executor is consuming so much of memory. Even if I increase the memory to 20 GB. The same failure happens. Regards, Sourav On Tue, Jun 9, 2015 at 12:58 PM,
Re: Linear Regression with SGD
It's always better to use a quasi newton solver if the runtime and problem scale permits as there are guarantees on opti mization...owlqn and bfgs are both quasi newton Most single node code bases will run quasi newton solvesif you are using sgd better is to use adadelta/adagrad or similar tricks...David added some of them in breeze recently... On Jun 9, 2015 7:25 PM, DB Tsai dbt...@dbtsai.com wrote: As Robin suggested, you may try the following new implementation. https://github.com/apache/spark/commit/6a827d5d1ec520f129e42c3818fe7d0d870dcbef Thanks. Sincerely, DB Tsai -- Blog: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D https://pgp.mit.edu/pks/lookup?search=0x59DF55B8AF08DF8D On Tue, Jun 9, 2015 at 3:22 PM, Robin East robin.e...@xense.co.uk wrote: Hi Stephen How many is a very large number of iterations? SGD is notorious for requiring 100s or 1000s of iterations, also you may need to spend some time tweaking the step-size. In 1.4 there is an implementation of ElasticNet Linear Regression which is supposed to compare favourably with an equivalent R implementation. On 9 Jun 2015, at 22:05, Stephen Carman scar...@coldlight.com wrote: Hi User group, We are using spark Linear Regression with SGD as the optimization technique and we are achieving very sub-optimal results. Can anyone shed some light on why this implementation seems to produce such poor results vs our own implementation? We are using a very small dataset, but we have to use a very large number of iterations to achieve similar results to our implementation, we’ve tried normalizing the data not normalizing the data and tuning every param. Our implementation is a closed form solution so we should be guaranteed convergence but the spark one is not, which is understandable, but why is it so far off? Has anyone experienced this? Steve Carman, M.S. Artificial Intelligence Engineer Coldlight-PTC scar...@coldlight.com This e-mail is intended solely for the above-mentioned recipient and it may contain confidential or privileged information. If you have received it in error, please notify us immediately and delete the e-mail. You must not copy, distribute, disclose or take any action in reliance on it. In addition, the contents of an attachment to this e-mail may contain software viruses which could damage your own computer system. While ColdLight Solutions, LLC has taken every reasonable precaution to minimize this risk, we cannot accept liability for any damage which you sustain as a result of software viruses. You should perform your own virus checks before opening the attachment. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Saving compressed textFiles from a DStream in Scala
Thanks Akhil. For posterity, I ended up with: https://gist.github.com/dokipen/aa07f351a970fe54fcff I couldn't get rddToFilename() to work, but it's impl was pretty simple. I'm a poet but I don't know it. On Tue, Jun 9, 2015 at 3:10 AM Akhil Das ak...@sigmoidanalytics.com wrote: like this? myDStream.foreachRDD(rdd = rdd.saveAsTextFile(/sigmoid/, codec )) Thanks Best Regards On Mon, Jun 8, 2015 at 8:06 PM, Bob Corsaro rcors...@gmail.com wrote: It looks like saveAsTextFiles doesn't support the compression parameter of RDD.saveAsTextFile. Is there a way to add the functionality in my client code without patching Spark? I tried making my own saveFunc function and calling DStream.foreachRDD but ran into trouble with invoking rddToFileName and making the RDD type parameter work properly. It's probably just do to my lack of Scala knowledge. Can anyone give me a hand? def saveAsTextFiles(prefix: String, suffix: String = ): Unit = ssc.withScope { val saveFunc = (rdd: RDD[T], time: Time) = { val file = rddToFileName(prefix, suffix, time) rdd.saveAsTextFile(file) } this.foreachRDD(saveFunc) }
How to use Apache spark mllib Model output in C++ component
There is C++ component which uses some model which we want to replace it by spark model output, but there is no C++ API support for reading model, what is the best way to solve this problem..? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Apache-spark-mllib-Model-output-in-C-component-tp23239.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kafka Spark Streaming: ERROR EndpointWriter: dropping message
Thank you for responding @nsalian. 1. I am trying to replicate this https://github.com/dibbhatt/kafka-spark-consumer project on my local system. 2. Yes, kafka and brokers on the same host. 3. I am working with kafka 0.7.3 and spark 1.3.1. Kafka 0.7.3 does not has --describe command. Though I've worked on three cases (Kafka and Zookeeper were on my machine all the time): (i) Producer-Consumer on my machine. (ii) Producer on my machine and Consumer on different machine. (iii) Consumer on my machine and producer on different machine. All the three cases were working properly. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Spark-Streaming-ERROR-EndpointWriter-dropping-message-tp23228p23240.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kafka Spark Streaming: ERROR EndpointWriter: dropping message
Hi, Can you please little detail stack trace from your receiver logs and also the consumer settings you used ? I have never tested the consumer with Kafka 0.7.3 ..not sure if Kafka Version is the issue . Have you tried building the consumer using Kafka 0.7.3 ? Regards, Dibyendu On Wed, Jun 10, 2015 at 11:52 AM, karma243 ashut...@reducedata.com wrote: Thank you for responding @nsalian. 1. I am trying to replicate this https://github.com/dibbhatt/kafka-spark-consumer project on my local system. 2. Yes, kafka and brokers on the same host. 3. I am working with kafka 0.7.3 and spark 1.3.1. Kafka 0.7.3 does not has --describe command. Though I've worked on three cases (Kafka and Zookeeper were on my machine all the time): (i) Producer-Consumer on my machine. (ii) Producer on my machine and Consumer on different machine. (iii) Consumer on my machine and producer on different machine. All the three cases were working properly. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Spark-Streaming-ERROR-EndpointWriter-dropping-message-tp23228p23240.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ClassNotDefException when using spark-submit with multiple jars and files located on HDFS
Or you can do sc.addJar(/path/to/the/jar), i haven't tested with HDFS path though it works fine with local path. Thanks Best Regards On Wed, Jun 10, 2015 at 10:17 AM, Jörn Franke jornfra...@gmail.com wrote: I am not sure they work with HDFS pathes. You may want to look at the source code. Alternatively you can create a fat jar containing all jars (let your build tool set correctly METAINF). This always works. Le mer. 10 juin 2015 à 6:22, Dong Lei dong...@microsoft.com a écrit : Thanks So much! I did put sleep on my code to have the UI available. Now from the UI, I can see: · In the “SparkProperty” Section, the spark.jars and spark.files are set as what I want. · In the “Classpath Entries” Section, my jars and files paths are there(with a HDFS path) And I check the HTTP file server directory, the stuctrue is like: D:\data\temp \ --spark-UUID \-- httpd-UUID \jars [*empty*] \files [*empty*] So I guess the files and jars and not properly downloaded from HDFS to these folders? I’m using standalone mode. Any ideas? Thanks Dong Lei *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Tuesday, June 9, 2015 4:46 PM *To:* Dong Lei *Cc:* user@spark.apache.org *Subject:* Re: ClassNotDefException when using spark-submit with multiple jars and files located on HDFS You can put a Thread.sleep(10) in the code to have the UI available for quiet some time. (Put it just before starting any of your transformations) Or you can enable the spark history server https://spark.apache.org/docs/latest/monitoring.html too. I believe --jars https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management would download the dependency jars on all your worker machines (can be found in spark work dir of your application along with stderr stdout files). Thanks Best Regards On Tue, Jun 9, 2015 at 1:29 PM, Dong Lei dong...@microsoft.com wrote: Thanks Akhil: The driver fails so fast to get a look at 4040. Is there any other way to see the download and ship process of the files? Is driver supposed to download these jars from HDFS to some location, then ship them to excutors? I can see from log that the driver downloaded the application jar but not the other jars specified by “—jars”. Or I misunderstand the usage of “--jars”, and the jars should be already in every worker, driver will not download them? Is there some useful docs? Thanks Dong Lei *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Tuesday, June 9, 2015 3:24 PM *To:* Dong Lei *Cc:* user@spark.apache.org *Subject:* Re: ClassNotDefException when using spark-submit with multiple jars and files located on HDFS Once you submits the application, you can check in the driver UI (running on port 4040) Environment Tab to see whether those jars you added got shipped or not. If they are shipped and still you are getting NoClassDef exceptions then it means that you are having a jar conflict which you can resolve by putting the jar with the class in it on the top of your classpath. Thanks Best Regards On Tue, Jun 9, 2015 at 9:05 AM, Dong Lei dong...@microsoft.com wrote: Hi, spark-users: I’m using spark-submit to submit multiple jars and files(all in HDFS) to run a job, with the following command: Spark-submit --class myClass --master spark://localhost:7077/ --deploy-mode cluster --jars hdfs://localhost/1.jar, hdfs://localhost/2.jar --files hdfs://localhost/1.txt, hdfs://localhost/2.txt hdfs://localhost/main.jar the stderr in the driver showed java.lang.ClassNotDefException for a class in 1.jar. I checked the log that spark has added these jars: INFO SparkContext: Added JAR hdfs:// …1.jar INFO SparkContext: Added JAR hdfs:// …2.jar In the folder of the driver, I only saw the main.jar is copied to that place, *but the other jars and files were not there* Could someone explain *how should I pass the jars and files* needed by the main jar to spark? If my class in main.jar refer to these files with a relative path, *will spark copy these files into one folder*? BTW, my class works in a client mode with all jars and files in local. Thanks Dong Lei
RE: Join between DStream and Periodically-Changing-RDD
It depends on how big the Batch RDD requiring reloading is Reloading it for EVERY single DStream RDD would slow down the stream processing inline with the total time required to reload the Batch RDD ….. But if the Batch RDD is not that big then that might not be an issues especially in the context of the latency requirements for your streaming app Another more efficient and real-time approach may be to represent your Batch RDD as a Dstraeam RDDs and keep aggregating them over the lifetime of the spark streaming app instance and keep joining with the actual Dstream RDDs You can feed your HDFS file into a Message Broker topic and consume it from there in the form of DStream RDDs which you keep aggregating over the lifetime of the spark streaming app instance From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Wednesday, June 10, 2015 8:36 AM To: Ilove Data Cc: user@spark.apache.org Subject: Re: Join between DStream and Periodically-Changing-RDD RDD's are immutable, why not join two DStreams? Not sure, but you can try something like this also: kvDstream.foreachRDD(rdd = { val file = ssc.sparkContext.textFile(/sigmoid/) val kvFile = file.map(x = (x.split(,)(0), x)) rdd.join(kvFile) }) Thanks Best Regards On Tue, Jun 9, 2015 at 7:37 PM, Ilove Data data4...@gmail.com wrote: Hi, I'm trying to join DStream with interval let say 20s, join with RDD loaded from HDFS folder which is changing periodically, let say new file is coming to the folder for every 10 minutes. How should it be done, considering the HDFS files in the folder is periodically changing/adding new files? Do RDD automatically detect changes in HDFS folder as RDD source and automatically reload RDD? Thanks! Rendy
Re: spark uses too much memory maybe (binaryFiles() with more than 1 million files in HDFS), groupBy or reduceByKey()
So, I don't have an explicit solution to your problem, but... On Wed, Jun 10, 2015 at 7:13 AM, Kostas Kougios kostas.koug...@googlemail.com wrote: I am profiling the driver. It currently has 564MB of strings which might be the 1mil file names. But also it has 2.34 GB of long[] ! That's so far, it is still running. What are those long[] used for? When Spark lists files it also needs all the extra metadata about where the files are in the HDFS cluster. That is a lot more than just the file's name - see the LocatedFileStatus class in the Hadoop docs for an idea. What you could try is to somehow break that input down into smaller batches, if that's feasible for your app. e.g. organize the files by directory and use separate directories in different calls to binaryFiles(), things like that. -- Marcelo
Re: Split RDD based on criteria
While it does feel like a filter is what you want to do, a common way to handle this is to map to different keys. Using your rddList example it becomes like this (scala style): --- val rddSplit: RDD[(Int, Any)] = rdd.map(x = (*createKey*(x), x)) val rddBuckets: RDD[(Int, Iterable[Any])] = rddSplit.groupByKey --- You write *createKey* to do the equivalent work as your filters then you have a single RDD with your buckets. On Wed, Jun 10, 2015 at 5:56 AM dgoldenberg dgoldenberg...@gmail.com wrote: Hi, I'm gathering that the typical approach for splitting an RDD is to apply several filters to it. rdd1 = rdd.filter(func1); rdd2 = rdd.filter(func2); ... Is there/should there be a way to create 'buckets' like these in one go? ListRDD rddList = rdd.filter(func1, func2, ..., funcN) Another angle here is, when applying a filter(func), is there a way to get two RDD's back, one for which func returned true for all elements of the original RDD (the one being filtered), and the other one for which func returned false for all the elements? PairRDD pair = rdd.filterTrueFalse(func); Right now I'm doing RDD x = rdd.filter(func); RDD y = rdd.filter(reverseOfFunc); This seems a bit tautological to me, though Spark must be optimizing this out (?) Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Split-RDD-based-on-criteria-tp23254.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark uses too much memory maybe (binaryFiles() with more than 1 million files in HDFS), groupBy or reduceByKey()
After some time the driver accumulated 6.67GB of long[] . The executor mem usage so far is low. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-uses-too-much-memory-maybe-binaryFiles-with-more-than-1-million-files-in-HDFS-groupBy-or-reduc-tp23253p23259.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark streaming - checkpointing - looking at old application directory and failure to start streaming context
Hi, If checkpoint data is already present in HDFS, driver fails to load as it is performing lookup on previous application directory. As that folder already exists, it fails to start context. Failed job's application id was application_1432284018452_0635 and job was performing lookup on application_1432284018452_0633 folder. Here's snippet of exception stack trace- 15/06/10 05:28:36 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Log directory hdfs://x.x.x.x:8020/user/myuser/spark/applicationHistory/application_1432284018452_0633 already exists!) Exception in thread Driver java.io.IOException: Log directory hdfs:// 172.16.201.171:8020/user/shn/spark/applicationHistory/application_1432284018452_0633 already exists! at org.apache.spark.util.FileLogger.createLogDir(FileLogger.scala:129) at org.apache.spark.util.FileLogger.start(FileLogger.scala:115) at org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:74) at org.apache.spark.SparkContext.init(SparkContext.scala:368) at org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:118) at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561) at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561) at scala.Option.map(Option.scala:145) at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:561) at Any idea on how to fix this issue? Thanks Ashish
Re: How to build spark with Hive 1.x ?
Hive version 1.x is currently not supported. Cheers On Wed, Jun 10, 2015 at 9:16 AM, Neal Yin neal@workday.com wrote: I am trying to build spark 1.3 branch with Hive 1.1.0. mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -Phive-0.13.1 -Dhive.version=1.1.0 –Dhive.version.short=1.1.0 –DskipTests clean package I got following error Failed to execute goal on project spark-hive_2.10: Could not resolve dependencies for project org.apache.spark:spark-hive_2.10:jar:1.3.2-SNAPSHOT: The following artifacts could not be resolved: org.spark-project.hive:hive-metastore:jar:1.1.0, org.spark-project.hive:hive-exec:jar:1.1.0, org.spark-project.hive:hive-serde:jar:1.1.0: Could not find artifact org.spark-project.hive:hive-metastore:jar:1.1.0 in central ( https://repo1.maven.org/maven2) - [Help 1] Then I checked maven central. http://search.maven.org/#browse|1655480514 Seems latest hive-metastore jar is 0.13.1a. Does this mean Spark doesn’t work with Hive version 1.x yet? Thanks, -Neal
Re: spark streaming - checkpointing - looking at old application directory and failure to start streaming context
I did not change driver program. I just shutdown the context and again started. BTW, I see this ticket already open in unassigned state - SPARK-6892 https://issues.apache.org/jira/browse/SPARK-6892 that talks about this issue. Is this a known issue? Also, any workarounds? On Wed, Jun 10, 2015 at 9:18 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Delete the checkpoint directory, you might have modified your driver program. Thanks Best Regards On Wed, Jun 10, 2015 at 9:44 PM, Ashish Nigam ashnigamt...@gmail.com wrote: Hi, If checkpoint data is already present in HDFS, driver fails to load as it is performing lookup on previous application directory. As that folder already exists, it fails to start context. Failed job's application id was application_1432284018452_0635 and job was performing lookup on application_1432284018452_0633 folder. Here's snippet of exception stack trace- 15/06/10 05:28:36 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Log directory hdfs://x.x.x.x:8020/user/myuser/spark/applicationHistory/application_1432284018452_0633 already exists!) Exception in thread Driver java.io.IOException: Log directory hdfs://x.x.x.x :8020/user/myuswer/spark/applicationHistory/application_1432284018452_0633 http://172.16.201.171:8020/user/shn/spark/applicationHistory/application_1432284018452_0633 already exists! at org.apache.spark.util.FileLogger.createLogDir(FileLogger.scala:129) at org.apache.spark.util.FileLogger.start(FileLogger.scala:115) at org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:74) at org.apache.spark.SparkContext.init(SparkContext.scala:368) at org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:118) at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561) at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561) at scala.Option.map(Option.scala:145) at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:561) at Any idea on how to fix this issue? Thanks Ashish
Re: spark streaming - checkpointing - looking at old application directory and failure to start streaming context
Delete the checkpoint directory, you might have modified your driver program. Thanks Best Regards On Wed, Jun 10, 2015 at 9:44 PM, Ashish Nigam ashnigamt...@gmail.com wrote: Hi, If checkpoint data is already present in HDFS, driver fails to load as it is performing lookup on previous application directory. As that folder already exists, it fails to start context. Failed job's application id was application_1432284018452_0635 and job was performing lookup on application_1432284018452_0633 folder. Here's snippet of exception stack trace- 15/06/10 05:28:36 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Log directory hdfs://x.x.x.x:8020/user/myuser/spark/applicationHistory/application_1432284018452_0633 already exists!) Exception in thread Driver java.io.IOException: Log directory hdfs:// 172.16.201.171:8020/user/shn/spark/applicationHistory/application_1432284018452_0633 already exists! at org.apache.spark.util.FileLogger.createLogDir(FileLogger.scala:129) at org.apache.spark.util.FileLogger.start(FileLogger.scala:115) at org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:74) at org.apache.spark.SparkContext.init(SparkContext.scala:368) at org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:118) at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561) at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561) at scala.Option.map(Option.scala:145) at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:561) at Any idea on how to fix this issue? Thanks Ashish
How to build spark with Hive 1.x ?
I am trying to build spark 1.3 branch with Hive 1.1.0. mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -Phive-0.13.1 -Dhive.version=1.1.0 -Dhive.version.short=1.1.0 -DskipTests clean package I got following error Failed to execute goal on project spark-hive_2.10: Could not resolve dependencies for project org.apache.spark:spark-hive_2.10:jar:1.3.2-SNAPSHOT: The following artifacts could not be resolved: org.spark-project.hive:hive-metastore:jar:1.1.0, org.spark-project.hive:hive-exec:jar:1.1.0, org.spark-project.hive:hive-serde:jar:1.1.0: Could not find artifact org.spark-project.hive:hive-metastore:jar:1.1.0 in central (https://repo1.maven.org/maven2) - [Help 1] Then I checked maven central. http://search.maven.org/#browse|1655480514 Seems latest hive-metastore jar is 0.13.1a. Does this mean Spark doesn't work with Hive version 1.x yet? Thanks, -Neal
Re: Split RDD based on criteria
No, but you can write a couple lines of code that do this. It's not optimized of course. This is actually a long and interesting side discussion, but I'm not sure how much it could be given that the computation is pull rather than push; there is no concept of one pass over the data resulting in many RDDs. However you can cache / persist the source RDD to at least make sure it is not recomputed. I don't think groupByKey is quite a solution since it means one RDD for which all values for one key must fit in memory, and because the desired output is an RDD, I am not sure that is suitable. On Wed, Jun 10, 2015 at 1:56 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, I'm gathering that the typical approach for splitting an RDD is to apply several filters to it. rdd1 = rdd.filter(func1); rdd2 = rdd.filter(func2); ... Is there/should there be a way to create 'buckets' like these in one go? ListRDD rddList = rdd.filter(func1, func2, ..., funcN) Another angle here is, when applying a filter(func), is there a way to get two RDD's back, one for which func returned true for all elements of the original RDD (the one being filtered), and the other one for which func returned false for all the elements? PairRDD pair = rdd.filterTrueFalse(func); Right now I'm doing RDD x = rdd.filter(func); RDD y = rdd.filter(reverseOfFunc); This seems a bit tautological to me, though Spark must be optimizing this out (?) Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Split-RDD-based-on-criteria-tp23254.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issue running Spark 1.4 on Yarn
Hi nsalian, For some reason the rest of this thread isn't showing up here. The NodeManager isn't busy. I'll copy/paste, the details are in there. I've tried running a Hadoop app pointing to the same queue. Same thing now, the job doesn't get accepted. I've cleared out the queue and killed all the pending jobs, the queue is still unusable. It seems like an issue with YARN, but it's specifically Spark that leaves the queue in this state. I've ran a Hadoop job in a for loop 10x, while specifying the queue explicitly, just to double-check. On Tue, Jun 9, 2015 at 4:45 PM, Matt Kapilevich matve...@gmail.com wrote: From the RM scheduler, I see 3 applications currently stuck in the root.thequeue queue. Used Resources: memory:0, vCores:0 Num Active Applications: 0 Num Pending Applications: 3 Min Resources: memory:0, vCores:0 Max Resources: memory:6655, vCores:4 Steady Fair Share: memory:1664, vCores:0 Instantaneous Fair Share: memory:6655, vCores:0 On Tue, Jun 9, 2015 at 4:30 PM, Matt Kapilevich matve...@gmail.com wrote: Yes! If I either specify a different queue or don't specify a queue at all, it works. On Tue, Jun 9, 2015 at 4:25 PM, Marcelo Vanzin van...@cloudera.com wrote: Does it work if you don't specify a queue? On Tue, Jun 9, 2015 at 1:21 PM, Matt Kapilevich matve...@gmail.com wrote: Hi Marcelo, Yes, restarting YARN fixes this behavior and it again works the first few times. The only thing that's consistent is that once Spark job submissions stop working, it's broken for good. On Tue, Jun 9, 2015 at 4:12 PM, Marcelo Vanzin van...@cloudera.com wrote: Apologies, I see you already posted everything from the RM logs that mention your stuck app. Have you tried restarting the YARN cluster to see if that changes anything? Does it go back to the first few tries work behaviour? I run 1.4 on top of CDH 5.4 pretty often and haven't seen anything like this. On Tue, Jun 9, 2015 at 1:01 PM, Marcelo Vanzin van...@cloudera.com wrote: On Tue, Jun 9, 2015 at 11:31 AM, Matt Kapilevich matve...@gmail.com wrote: Like I mentioned earlier, I'm able to execute Hadoop jobs fine even now - this problem is specific to Spark. That doesn't necessarily mean anything. Spark apps have different resource requirements than Hadoop apps. Check your RM logs for any line that mentions your Spark app id. That may give you some insight into what's happening or not. -- Marcelo -- Marcelo -- Marcelo -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issue-running-Spark-1-4-on-Yarn-tp23211p23258.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Maven Test error
Dear List, I'm trying to reference a lonely message to this list from March 25th,( http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Maven-Test-error-td22216.html ), but I'm unsure this will thread properly. Sorry, if didn't work out. Anyway, using Spark 1.4.0-RC4 I run into the same issue when performing tests, using build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Phadoop-provided -Phive -Phive-thriftserver test after a successful clean-package build. The error is: java.lang.IllegalStateException: failed to create a child event loop Could this be due to another instance of Spark blocking ports? In that case maybe the test case should be able to adapt to this particular issue. Thanks for any help, Rick
Re: Determining number of executors within RDD
On YARN, there is no concept of a Spark Worker. Multiple executors will be run per node without any effort required by the user, as long as all the executors fit within each node's resource limits. -Sandy On Wed, Jun 10, 2015 at 3:24 PM, Evo Eftimov evo.efti...@isecc.com wrote: Yes i think it is ONE worker ONE executor as executor is nothing but jvm instance spawned by the worker To run more executors ie jvm instances on the same physical cluster node you need to run more than one worker on that node and then allocate only part of the sys resourced to that worker/executot Sent from Samsung Mobile Original message From: maxdml Date:2015/06/10 19:56 (GMT+00:00) To: user@spark.apache.org Subject: Re: Determining number of executors within RDD Actually this is somehow confusing for two reasons: - First, the option 'spark.executor.instances', which seems to be only dealt with in the case of YARN in the source code of SparkSubmit.scala, is also present in the conf/spark-env.sh file under the standalone section, which would indicate that it is also available for this mode - Second, a post from Andrew Or states that this properties define the number of workers in the cluster, not the number of executors on a given worker. ( http://apache-spark-user-list.1001560.n3.nabble.com/clarification-for-some-spark-on-yarn-configuration-options-td13692.html ) Could anyone clarify this? :-) Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Determining-number-of-executors-within-RDD-tp15554p23262.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Problem with pyspark on Docker talking to YARN cluster
All, I was wondering if any of you have solved this problem : I have pyspark(ipython mode) running on docker talking to a yarn cluster(AM/executors are NOT running on docker). When I start pyspark in the docker container, it binds to port *49460.* Once the app is submitted to YARN, the app(AM) on the cluster side fails with the following error message : *ERROR yarn.ApplicationMaster: Failed to connect to driver at :49460* This makes sense because AM is trying to talk to container directly and it cannot, it should be talking to the docker host instead. *Question* : How do we make Spark AM talk to host1:port1 of the docker host(not the container), which would then route it to container which is running pyspark on host2:port2 ? One solution I could think of is : after starting the driver(say on hostA:portA), and before submitting the app to yarn, we could reset driver's host/port to hostmachine's ip/port. So the AM can then talk hostmachine's ip/port, which would be mapped to the container. Thoughts ? -- Thanks, Ashwin
Re: Determining number of executors within RDD
Yes i think it is ONE worker ONE executor as executor is nothing but jvm instance spawned by the worker To run more executors ie jvm instances on the same physical cluster node you need to run more than one worker on that node and then allocate only part of the sys resourced to that worker/executot Sent from Samsung Mobile div Original message /divdivFrom: maxdml max...@cs.duke.edu /divdivDate:2015/06/10 19:56 (GMT+00:00) /divdivTo: user@spark.apache.org /divdivSubject: Re: Determining number of executors within RDD /divdiv /divActually this is somehow confusing for two reasons: - First, the option 'spark.executor.instances', which seems to be only dealt with in the case of YARN in the source code of SparkSubmit.scala, is also present in the conf/spark-env.sh file under the standalone section, which would indicate that it is also available for this mode - Second, a post from Andrew Or states that this properties define the number of workers in the cluster, not the number of executors on a given worker. (http://apache-spark-user-list.1001560.n3.nabble.com/clarification-for-some-spark-on-yarn-configuration-options-td13692.html) Could anyone clarify this? :-) Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Determining-number-of-executors-within-RDD-tp15554p23262.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Efficient way to get top K values per key in (key, value) RDD?
Hi, I am a Spark newbie, and trying to solve the same problem, and have implemented the same exact solution that sowen is suggesting. I am using priorityqueues to keep trak of the top 25 sub_categories, per each category, and using the combineByKey function to do that. However I run into the following exception when I submit the spark job: ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 17) java.lang.UnsupportedOperationException: unsuitable as hash key at scala.collection.mutable.PriorityQueue.hashCode(PriorityQueue.scala:226) From the error it looks like spark is trying to use the mutable priority queue as a hashkey so the error makes sense, but I don't get why it is doing that since the value of the RDD record is a priority queue not the key. Maybe there is a more straightforward solution to what I want to achieve, so any suggestion is appreciated :) Thanks, Erisa -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-way-to-get-top-K-values-per-key-in-key-value-RDD-tp20370p23263.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issue running Spark 1.4 on Yarn
Hi, Thanks for the added information. Helps add more context. Is that specific queue different from the others? FairScheduler.xml should have the information needed.Or if you have a separate allocations.xml. Something of this format: allocations queue name=sample_queue minResources1 mb,0vcores/minResources maxResources9 mb,0vcores/maxResources maxRunningApps50/maxRunningApps maxAMShare0.1/maxAMShare weight2.0/weight schedulingPolicyfair/schedulingPolicy queue name=sample_sub_queue aclSubmitAppscharlie/aclSubmitApps minResources5000 mb,0vcores/minResources /queue /queue Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issue-running-Spark-1-4-on-Yarn-tp23211p23261.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark-submit does not use hive-site.xml
Thanks for your help ! Switching to HiveContext fixed the issue. Just one side comment: In the documentation regarding Hive Tables and HiveContext https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables, we see: // sc is an existing JavaSparkContext.HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc); But this is incorrect as the constructor in HiveContext does not accept a JavaSparkContext, but a SparkContext. (the comment is basically misleading). The correct code snippet should be: HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc()); Thanks again for your help. On Wed, Jun 10, 2015 at 1:17 AM, Cheng Lian lian.cs@gmail.com wrote: Hm, this is a common confusion... Although the variable name is `sqlContext` in Spark shell, it's actually a `HiveContext`, which extends `SQLContext` and has the ability to communicate with Hive metastore. So your program need to instantiate a `org.apache.spark.sql.hive.HiveContext` instead. Cheng On 6/10/15 10:19 AM, James Pirz wrote: I am using Spark (standalone) to run queries (from a remote client) against data in tables that are already defined/loaded in Hive. I have started metastore service in Hive successfully, and by putting hive-site.xml, with proper metastore.uri, in $SPARK_HOME/conf directory, I tried to share its config with spark. When I start spark-shell, it gives me a default sqlContext, and I can use that to access my Hive's tables with no problem. But once I submit a similar query via Spark application through 'spark-submit', it does not see the tables and it seems it does not pick hive-site.xml which is under conf directory in Spark's home. I tried to use '--files' argument with spark-submit to pass hive-site.xml' to the workers, but it did not change anything. Here is how I try to run the application: $SPARK_HOME/bin/spark-submit --class SimpleClient --master spark://my-spark-master:7077 --files=$SPARK_HOME/conf/hive-site.xml simple-sql-client-1.0.jar Here is the simple example code that I try to run (in Java): SparkConf conf = new SparkConf().setAppName(Simple SQL Client); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); DataFrame res = sqlContext.sql(show tables); res.show(); Here are the SW versions: Spark: 1.3 Hive: 1.2 Hadoop: 2.6 Thanks in advance for any suggestion.
RE: [SPARK-6330] 1.4.0/1.5.0 Bug to access S3 -- AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyI
I have tried both cases(s3 and s3n, set all possible parameters), and trust me, the same code works with 1.3.1, but not for 1.3.0 and 1.4.0, 1.5.0. I even use a plain project to test this, and use maven to include all referenced library, but it give me error. I think everyone can easily to replicate my issue locally (the code doesn’t need to run on EC2, I run it directly from my local windows pc). Regards, Shuai From: Aaron Davidson [mailto:ilike...@gmail.com] Sent: Wednesday, June 10, 2015 12:28 PM To: Shuai Zheng Subject: Re: [SPARK-6330] 1.4.0/1.5.0 Bug to access S3 -- AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey properties (respectively) That exception is a bit weird as it refers to fs.s3 instead of fs.s3n. Maybe you are accidentally using s3://? Otherwise, you might try also specifying that property too. On Jun 9, 2015 12:45 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I have some code to access s3 from Spark. The code is as simple as: JavaSparkContext ctx = new JavaSparkContext(sparkConf); Configuration hadoopConf = ctx.hadoopConfiguration(); hadoopConf.set(fs.s3n.impl, org.apache.hadoop.fs.s3native.NativeS3FileSystem); hadoopConf.set(fs.s3n.awsAccessKeyId, ---); hadoopConf.set(fs.s3n.awsSecretAccessKey, --); SQLContext sql = new SQLContext(ctx); DataFrame grid_lookup = sql.parquetFile(s3n://---); grid_lookup.count(); ctx.stop(); The code works for 1.3.1. And for 1.4.0 and latest 1.5.0, it always give me below exception: Exception in thread main java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey properties (respectively). I don’t know why, I remember this is a known issue in 1.3.0: https://issues.apache.org/jira/browse/SPARK-6330, and solved in 1.3.1 But now it is not working again for a newer version? I remember while I switched to 1.4.0, for a while it works (while I worked with the master branch of the latest source code), and I just refresh latest code, and I am given this error again. Anyone has idea? Regards, Shuai
Re: spark-submit does not use hive-site.xml
Thanks for pointing out the documentation error :) Opened https://github.com/apache/spark/pull/6749 to fix this. On 6/11/15 1:18 AM, James Pirz wrote: Thanks for your help ! Switching to HiveContext fixed the issue. Just one side comment: In the documentation regarding Hive Tables and HiveContext https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables, we see: |// sc is an existing JavaSparkContext. HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc);| But this is incorrect as the constructor in HiveContext does not accept a JavaSparkContext, but a SparkContext. (the comment is basically misleading). The correct code snippet should be: |HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc http://sc.sc());| Thanks again for your help. On Wed, Jun 10, 2015 at 1:17 AM, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: Hm, this is a common confusion... Although the variable name is `sqlContext` in Spark shell, it's actually a `HiveContext`, which extends `SQLContext` and has the ability to communicate with Hive metastore. So your program need to instantiate a `org.apache.spark.sql.hive.HiveContext` instead. Cheng On 6/10/15 10:19 AM, James Pirz wrote: I am using Spark (standalone) to run queries (from a remote client) against data in tables that are already defined/loaded in Hive. I have started metastore service in Hive successfully, and by putting hive-site.xml, with proper metastore.uri, in $SPARK_HOME/conf directory, I tried to share its config with spark. When I start spark-shell, it gives me a default sqlContext, and I can use that to access my Hive's tables with no problem. But once I submit a similar query via Spark application through 'spark-submit', it does not see the tables and it seems it does not pick hive-site.xml which is under conf directory in Spark's home. I tried to use '--files' argument with spark-submit to pass hive-site.xml' to the workers, but it did not change anything. Here is how I try to run the application: $SPARK_HOME/bin/spark-submit --class SimpleClient --master spark://my-spark-master:7077 --files=$SPARK_HOME/conf/hive-site.xml simple-sql-client-1.0.jar Here is the simple example code that I try to run (in Java): SparkConf conf = new SparkConf().setAppName(Simple SQL Client); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); DataFrame res = sqlContext.sql(show tables); res.show(); Here are the SW versions: Spark: 1.3 Hive: 1.2 Hadoop: 2.6 Thanks in advance for any suggestion.
Re: Determining number of executors within RDD
Actually this is somehow confusing for two reasons: - First, the option 'spark.executor.instances', which seems to be only dealt with in the case of YARN in the source code of SparkSubmit.scala, is also present in the conf/spark-env.sh file under the standalone section, which would indicate that it is also available for this mode - Second, a post from Andrew Or states that this properties define the number of workers in the cluster, not the number of executors on a given worker. (http://apache-spark-user-list.1001560.n3.nabble.com/clarification-for-some-spark-on-yarn-configuration-options-td13692.html) Could anyone clarify this? :-) Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Determining-number-of-executors-within-RDD-tp15554p23262.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Hive Custom Transform Scripts (read from stdin and print to stdout) in Spark
What is the best way to reuse hive custom transform scripts written in python or awk or c++ which process data from stdin and print to stdout in spark. These scripts are typically using the Transform Syntax in Hive https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Transform -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Hive-Custom-Transform-Scripts-read-from-stdin-and-print-to-stdout-in-Spark-tp23264.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Determining number of executors within RDD
This PR adds support for multiple executors per worker: https://github.com/apache/spark/pull/731 and should be available in 1.4. Thanks, Nishkam On Wed, Jun 10, 2015 at 1:35 PM, Evo Eftimov evo.efti...@isecc.com wrote: We/i were discussing STANDALONE mode, besides maxdml had already summarized what is available and possible under yarn So let me recap - for standalone mode if you need more than 1 executor per physical host e.g. to partition its sys resources more finley (especialy RAM per jvm instance) you need to got for what is essentialy a bit of a hack ie runn8ng more than 1 workers per machine Sent from Samsung Mobile Original message From: Sandy Ryza Date:2015/06/10 21:31 (GMT+00:00) To: Evo Eftimov Cc: maxdml ,user@spark.apache.org Subject: Re: Determining number of executors within RDD On YARN, there is no concept of a Spark Worker. Multiple executors will be run per node without any effort required by the user, as long as all the executors fit within each node's resource limits. -Sandy On Wed, Jun 10, 2015 at 3:24 PM, Evo Eftimov evo.efti...@isecc.com wrote: Yes i think it is ONE worker ONE executor as executor is nothing but jvm instance spawned by the worker To run more executors ie jvm instances on the same physical cluster node you need to run more than one worker on that node and then allocate only part of the sys resourced to that worker/executot Sent from Samsung Mobile Original message From: maxdml Date:2015/06/10 19:56 (GMT+00:00) To: user@spark.apache.org Subject: Re: Determining number of executors within RDD Actually this is somehow confusing for two reasons: - First, the option 'spark.executor.instances', which seems to be only dealt with in the case of YARN in the source code of SparkSubmit.scala, is also present in the conf/spark-env.sh file under the standalone section, which would indicate that it is also available for this mode - Second, a post from Andrew Or states that this properties define the number of workers in the cluster, not the number of executors on a given worker. ( http://apache-spark-user-list.1001560.n3.nabble.com/clarification-for-some-spark-on-yarn-configuration-options-td13692.html ) Could anyone clarify this? :-) Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Determining-number-of-executors-within-RDD-tp15554p23262.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Can't access Ganglia on EC2 Spark cluster
Launching using spark-ec2 script results in: Setting up ganglia RSYNC'ing /etc/ganglia to slaves... ... Shutting down GANGLIA gmond: [FAILED] Starting GANGLIA gmond:[ OK ] Shutting down GANGLIA gmond: [FAILED] Starting GANGLIA gmond:[ OK ] Connection to ... closed. ... Stopping httpd:[FAILED] Starting httpd: httpd: Syntax error on line 199 of /etc/httpd/conf/httpd.conf: Cannot load modules/libphp-5.5.so into server: /etc/httpd/modules/libphp-5.5.so: cannot open shared object file: No such file or directory [FAILED] [timing] ganglia setup: 00h 00m 03s Connection to ... closed. Spark standalone cluster started at ...:8080 Ganglia started at ...:5080/ganglia Done! However, when I `netstat`, there is no 5080 port listened on. Is this related to the above error with httpd or it's something else? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-access-Ganglia-on-EC2-Spark-cluster-tp23266.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to set KryoRegistrator class in spark-shell
you need to register using spark-default.xml as explained here https://books.google.com/books?id=WE_GBwAAQBAJpg=PA239lpg=PA239dq=spark+shell+register+kryo+serializationsource=blots=vCxgEfz1-2sig=dHU8FY81zVoBqYIJbCFuRwyFjAwhl=ensa=Xved=0CEwQ6AEwB2oVChMIn_iujpCGxgIVDZmICh3kYADW#v=onepageq=spark%20shell%20register%20kryo%20serializationf=false -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-KryoRegistrator-class-in-spark-shell-tp12498p23265.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Fully in-memory shuffles
In many cases the shuffle will actually hit the OS buffer cache and not ever touch spinning disk if it is a size that is less than memory on the machine. - Patrick On Wed, Jun 10, 2015 at 5:06 PM, Corey Nolet cjno...@gmail.com wrote: So with this... to help my understanding of Spark under the hood- Is this statement correct When data needs to pass between multiple JVMs, a shuffle will always hit disk? On Wed, Jun 10, 2015 at 10:11 AM, Josh Rosen rosenvi...@gmail.com wrote: There's a discussion of this at https://github.com/apache/spark/pull/5403 On Wed, Jun 10, 2015 at 7:08 AM, Corey Nolet cjno...@gmail.com wrote: Is it possible to configure Spark to do all of its shuffling FULLY in memory (given that I have enough memory to store all the data)? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Fully in-memory shuffles
So with this... to help my understanding of Spark under the hood- Is this statement correct When data needs to pass between multiple JVMs, a shuffle will *always* hit disk? On Wed, Jun 10, 2015 at 10:11 AM, Josh Rosen rosenvi...@gmail.com wrote: There's a discussion of this at https://github.com/apache/spark/pull/5403 On Wed, Jun 10, 2015 at 7:08 AM, Corey Nolet cjno...@gmail.com wrote: Is it possible to configure Spark to do all of its shuffling FULLY in memory (given that I have enough memory to store all the data)?
Re: RDD of RDDs
Thanks much for the detailed explanations. I suspected architectural support of the notion of rdd of rdds, but my understanding of Spark or distributed computing in general is not as deep as allowing me to understand better. so this really helps! I ended up going with List[RDD]. The collection of unique users in my dataset is not too bad - 2000 or so, so I simply put each into a RDD by doing for user in users: userrdd = bigrdd.filter(lambda x: x[userid_pos] == user) Thanks for helping out! Ping On Tue, Jun 9, 2015 at 1:17 AM kiran lonikar loni...@gmail.com wrote: Simillar question was asked before: http://apache-spark-user-list.1001560.n3.nabble.com/Rdd-of-Rdds-td17025.html Here is one of the reasons why I think RDD[RDD[T]] is not possible: - RDD is only a handle to the actual data partitions. It has a reference/pointer to the *SparkContext* object (*sc*) and a list of partitions. - The *SparkContext *is an object in the Spark Application/Driver Program's JVM. Similarly, the list of partitions is also in the JVM of the driver program. Each partition contains kind of remote references to the partition data on the worker JVMs. - The functions passed to RDD's transformations and actions execute in the worker's JVMs on different nodes. For example, in *rdd.map { x = x*x }*, the function performing *x*x* runs on the JVMs of the worker nodes where the partitions of the RDD reside. These JVMs do not have access to the *sc* since its only on the driver's JVM. - Thus, in case of your *RDD of RDD*: *outerRDD.map { innerRdd = innerRDD.filter { x = x*x } }*, the worker nodes will not be able to execute the *filter* on *innerRDD *as the code in the worker does not have access to sc and can not launch a spark job. Hope it helps. You need to consider List[RDD] or some other collection. -Kiran On Tue, Jun 9, 2015 at 2:25 AM, ping yan sharon...@gmail.com wrote: Hi, The problem I am looking at is as follows: - I read in a log file of multiple users as a RDD - I'd like to group the above RDD into *multiple RDDs* by userIds (the key) - my processEachUser() function then takes in each RDD mapped into each individual user, and calls for RDD.map or DataFrame operations on them. (I already had the function coded, I am therefore reluctant to work with the ResultIterable object coming out of rdd.groupByKey() ... ) I've searched the mailing list and googled on RDD of RDDs and seems like it isn't a thing at all. A few choices left seem to be: 1) groupByKey() and then work with the ResultIterable object; 2) groupbyKey() and then write each group into a file, and read them back as individual rdds to process.. Anyone got a better idea or had a similar problem before? Thanks! Ping -- Ping Yan Ph.D. in Management Dept. of Management Information Systems University of Arizona Tucson, AZ 85721
NullPointerException with functions.rand()
Hello, I am using 1.4.0 and found the following weird behavior. This case works fine: scala sc.parallelize(Seq((1,2), (3, 100))).toDF.withColumn(index, rand(30)).show() +--+---+---+ |_1| _2| index| +--+---+---+ | 1| 2| 0.6662967911724369| | 3|100|0.35734504984676396| +--+---+---+ However, when I use sqlContext.createDataFrame instead, I get a NPE: scala sqlContext.createDataFrame(Seq((1,2), (3, 100))).withColumn(index, rand(30)).show() java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.RDG.rng$lzycompute(random.scala:39) at org.apache.spark.sql.catalyst.expressions.RDG.rng(random.scala:39) .. Does any one know why? Thanks. Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-with-functions-rand-tp23267.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark not working on windows 7 64 bit
You may compare the c:\windows\system32\drivers\etc\hosts if they are configured similarly Le mer. 10 juin 2015 à 17:16, Eran Medan eran.me...@gmail.com a écrit : I'm on a road block trying to understand why Spark doesn't work for a colleague of mine on his Windows 7 laptop. I have pretty much the same setup and everything works fine. I googled the error message and didn't get anything that resovled it. Here is the exception message (after running spark 1.3.1 vanilla installation prebuilt for Hadoop 2.4) JDK is 1.7 64 bit akka.actor.ActorInitializationException: exception during creation at akka.actor.ActorInitializationException$.apply(Actor.scala:164) at akka.actor.ActorCell.create(ActorCell.scala:596) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: akka.actor.ActorNotFound:* Actor not found for: ActorSelection[Anchor(akka://sparkDriver/deadLetters), Path(/)* *]* at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.scala$concurrent$impl$Promise$DefaultPromise$$dispatchOrAddCallb ack(Promise.scala:280) at scala.concurrent.impl.Promise$DefaultPromise.onComplete(Promise.scala:270) at akka.actor.ActorSelection.resolveOne(ActorSelection.scala:63) at akka.actor.ActorSelection.resolveOne(ActorSelection.scala:80) at org.apache.spark.util.AkkaUtils$.makeDriverRef(AkkaUtils.scala:221) at org.apache.spark.executor.Executor.startDriverHeartbeater(Executor.scala:393) at org.apache.spark.executor.Executor.init(Executor.scala:119) at org.apache.spark.scheduler.local.LocalActor.init(LocalBackend.scala:58) at org.apache.spark.scheduler.local.LocalBackend$$anonfun$start$1.apply(LocalBackend.scala:107) at org.apache.spark.scheduler.local.LocalBackend$$anonfun$start$1.apply(LocalBackend.scala:107) at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:343) at akka.actor.Props.newActor(Props.scala:252) at akka.actor.ActorCell.newActor(ActorCell.scala:552) at akka.actor.ActorCell.create(ActorCell.scala:578) ... 9 more I have see this error mentioned, but for Linux, not windows: http://apache-spark-user-list.1001560.n3.nabble.com/Actor-not-found-td22265.html This one also doesn't seem to offer any resolution: https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/UqCYeUpgGCU My assumption is that this is related to some resolving / IP conflicts etc, but I'm not sure. One difference that I did notice between my system and my friend's when I do ping localhost, I get 127.0.0.1 when he does it he gets ::1 I saw an issue about spark having problems with ipv6, and saw it was resolved only in 1.4, is that related? https://issues.apache.org/jira/browse/SPARK-6440
Re: PYTHONPATH on worker nodes
I don't think it's propagated automatically. Try this: spark-submit --conf spark.executorEnv.PYTHONPATH=... ... On Wed, Jun 10, 2015 at 8:15 AM, Bob Corsaro rcors...@gmail.com wrote: I'm setting PYTHONPATH before calling pyspark, but the worker nodes aren't inheriting it. I've tried looking through the code and it appears that it should, I can't find the bug. Here's an example, what am I doing wrong? https://gist.github.com/dokipen/84c4e4a89fddf702fdf1 -- Marcelo
Re: Can't access Ganglia on EC2 Spark cluster
Looks like libphp version is 5.6 now, which version of spark are you using? Thanks Best Regards On Thu, Jun 11, 2015 at 3:46 AM, barmaley o...@solver.com wrote: Launching using spark-ec2 script results in: Setting up ganglia RSYNC'ing /etc/ganglia to slaves... ... Shutting down GANGLIA gmond: [FAILED] Starting GANGLIA gmond:[ OK ] Shutting down GANGLIA gmond: [FAILED] Starting GANGLIA gmond:[ OK ] Connection to ... closed. ... Stopping httpd:[FAILED] Starting httpd: httpd: Syntax error on line 199 of /etc/httpd/conf/httpd.conf: Cannot load modules/libphp-5.5.so into server: /etc/httpd/modules/libphp-5.5.so: cannot open shared object file: No such file or directory [FAILED] [timing] ganglia setup: 00h 00m 03s Connection to ... closed. Spark standalone cluster started at ...:8080 Ganglia started at ...:5080/ganglia Done! However, when I `netstat`, there is no 5080 port listened on. Is this related to the above error with httpd or it's something else? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-access-Ganglia-on-EC2-Spark-cluster-tp23266.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark standalone mode and kerberized cluster
This might help http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.2.4/Apache_Spark_Quickstart_v224/content/ch_installing-kerb-spark-quickstart.html Thanks Best Regards On Wed, Jun 10, 2015 at 6:49 PM, kazeborja kazebo...@gmail.com wrote: Hello all. I've been reading some old mails and notice that the use of kerberos in a standalone cluster was not supported. Is this stillt he case? Thanks. Borja. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-mode-and-kerberized-cluster-tp23255.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Fully in-memory shuffles
Ok so it is the case that small shuffles can be done without hitting any disk. Is this the same case for the aux shuffle service in yarn? Can that be done without hitting disk? On Wed, Jun 10, 2015 at 9:17 PM, Patrick Wendell pwend...@gmail.com wrote: In many cases the shuffle will actually hit the OS buffer cache and not ever touch spinning disk if it is a size that is less than memory on the machine. - Patrick On Wed, Jun 10, 2015 at 5:06 PM, Corey Nolet cjno...@gmail.com wrote: So with this... to help my understanding of Spark under the hood- Is this statement correct When data needs to pass between multiple JVMs, a shuffle will always hit disk? On Wed, Jun 10, 2015 at 10:11 AM, Josh Rosen rosenvi...@gmail.com wrote: There's a discussion of this at https://github.com/apache/spark/pull/5403 On Wed, Jun 10, 2015 at 7:08 AM, Corey Nolet cjno...@gmail.com wrote: Is it possible to configure Spark to do all of its shuffling FULLY in memory (given that I have enough memory to store all the data)?
Spark not working on windows 7 64 bit
I'm on a road block trying to understand why Spark doesn't work for a colleague of mine on his Windows 7 laptop. I have pretty much the same setup and everything works fine. I googled the error message and didn't get anything that resovled it. Here is the exception message (after running spark 1.3.1 vanilla installation prebuilt for Hadoop 2.4) JDK is 1.7 64 bit akka.actor.ActorInitializationException: exception during creation at akka.actor.ActorInitializationException$.apply(Actor.scala:164) at akka.actor.ActorCell.create(ActorCell.scala:596) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: akka.actor.ActorNotFound:* Actor not found for: ActorSelection[Anchor(akka://sparkDriver/deadLetters), Path(/)* *]* at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.scala$concurrent$impl$Promise$DefaultPromise$$dispatchOrAddCallb ack(Promise.scala:280) at scala.concurrent.impl.Promise$DefaultPromise.onComplete(Promise.scala:270) at akka.actor.ActorSelection.resolveOne(ActorSelection.scala:63) at akka.actor.ActorSelection.resolveOne(ActorSelection.scala:80) at org.apache.spark.util.AkkaUtils$.makeDriverRef(AkkaUtils.scala:221) at org.apache.spark.executor.Executor.startDriverHeartbeater(Executor.scala:393) at org.apache.spark.executor.Executor.init(Executor.scala:119) at org.apache.spark.scheduler.local.LocalActor.init(LocalBackend.scala:58) at org.apache.spark.scheduler.local.LocalBackend$$anonfun$start$1.apply(LocalBackend.scala:107) at org.apache.spark.scheduler.local.LocalBackend$$anonfun$start$1.apply(LocalBackend.scala:107) at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:343) at akka.actor.Props.newActor(Props.scala:252) at akka.actor.ActorCell.newActor(ActorCell.scala:552) at akka.actor.ActorCell.create(ActorCell.scala:578) ... 9 more I have see this error mentioned, but for Linux, not windows: http://apache-spark-user-list.1001560.n3.nabble.com/Actor-not-found-td22265.html This one also doesn't seem to offer any resolution: https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/UqCYeUpgGCU My assumption is that this is related to some resolving / IP conflicts etc, but I'm not sure. One difference that I did notice between my system and my friend's when I do ping localhost, I get 127.0.0.1 when he does it he gets ::1 I saw an issue about spark having problems with ipv6, and saw it was resolved only in 1.4, is that related? https://issues.apache.org/jira/browse/SPARK-6440
PYTHONPATH on worker nodes
I'm setting PYTHONPATH before calling pyspark, but the worker nodes aren't inheriting it. I've tried looking through the code and it appears that it should, I can't find the bug. Here's an example, what am I doing wrong? https://gist.github.com/dokipen/84c4e4a89fddf702fdf1