Re: how to clean shuffle write each iteration
in ALS, I guess all the iteration’s rdds are referenced by its next iteration’s rdd, so all the shuffle data will not be deleted until the als job finished… I guess checkpoint could solve my problem, do you know checkpoint? 在 2015年3月3日,下午4:18,nitin [via Apache Spark User List] ml-node+s1001560n21889...@n3.nabble.com 写道: Shuffle write will be cleaned if it is not referenced by any object directly/indirectly. There is a garbage collector written inside spark which periodically checks for weak references to RDDs/shuffle write/broadcast and deletes them. If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-clean-shuffle-write-each-iteration-tp21886p21889.html http://apache-spark-user-list.1001560.n3.nabble.com/how-to-clean-shuffle-write-each-iteration-tp21886p21889.html To unsubscribe from how to clean shuffle write each iteration, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=21886code=bGlzZW5kb25nQDE2My5jb218MjE4ODZ8MjQ0MTU2NDA4. NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-clean-shuffle-write-each-iteration-tp21886p21890.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: Supporting Hive features in Spark SQL Thrift JDBC server
Can you provide the detailed failure call stack? From: shahab [mailto:shahab.mok...@gmail.com] Sent: Tuesday, March 3, 2015 3:52 PM To: user@spark.apache.org Subject: Supporting Hive features in Spark SQL Thrift JDBC server Hi, According to Spark SQL documentation, Spark SQL supports the vast majority of Hive features, such as User Defined Functions( UDF) , and one of these UFDs is current_date() function, which should be supported. However, i get error when I am using this UDF in my SQL query. There are couple of other UDFs which cause similar error. Am I missing something in my JDBC server ? /Shahab
Re: Exception while select into table.
Hi Yi, Thanks for your reply. 1. The version of spark is 1.2.0 and the version of hive is 0.10.0-cdh4.2.1. 2. The full trace stack of the exception: 15/03/03 13:41:30 INFO Client: client token: DUrrav1rAADCnhQzX_Ic6CMnfqcW2NIxra5n8824CRFZQVJOX0NMSUVOVF9UT0tFTgA diagnostics: User class threw exception: checkPaths: hdfs://longzhou-hdpnn.lz.dscc:11000/tmp/hive-hadoop/hive_2015-03-03_13-41-04_472_3573658402424030395-1/-ext-1 has nested directoryhdfs://longzhou-hdpnn.lz.dscc:11000/tmp/hive-hadoop/hive_2015-03-03_13-41-04_472_3573658402424030395-1/-ext-1/attempt_201503031341_0057_m_003375_21951 ApplicationMaster host: longzhou-hdp4.lz.dscc ApplicationMaster RPC port: 0 queue: dt_spark start time: 1425361063973 final status: FAILED tracking URL: longzhou-hdpnn.lz.dscc:12080/proxy/application_1421288865131_49822/history/application_1421288865131_49822 user: dt Exception in thread main org.apache.spark.SparkException: Application finished with failed status at org.apache.spark.deploy.yarn.ClientBase$class.run(ClientBase.scala:504) at org.apache.spark.deploy.yarn.Client.run(Client.scala:39) at org.apache.spark.deploy.yarn.Client$.main(Client.scala:143) at org.apache.spark.deploy.yarn.Client.main(Client.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) It seems that it is you are right about the causes. Still, I am confused that why the nested directory is `hdfs://longzhou-hdpnn.lz.dscc:11000/tmp/hive-hadoop/hive_2015-03-03_13-41-04_472_3573658402424030395-1/-ext-1/attempt_201503031341_0057_m_003375_21951` but not the path which |bak_startup_log_uid_20150227| point to? What's in the `/tmp/hive-hadoop` ? What are they used for? It seems that there are a huge lot of files in this directory. Thanks. On 2015年03月03日 14:43, Yi Tian wrote: Hi, Some suggestions: 1 You should tell us the version of spark and hive you are using. 2 You shoul paste the full trace stack of the exception. In this case, I guess you have a nested directory in the path which |bak_startup_log_uid_20150227| point to. and the config field |hive.mapred.supports.subdirectories| is |false| by default. so… |if (!conf.getBoolVar(HiveConf.ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES) item.isDir()) { throw new HiveException(checkPaths: + src.getPath() + has nested directory + itemSource); } | On 3/3/15 14:36, LinQili wrote: Hi all, I was doing select using spark sql like: insert into table startup_log_uid_20150227 select * from bak_startup_log_uid_20150227 where login_time 1425027600 Usually, it got a exception: org.apache.hadoop.hive.ql.metadata.Hive.checkPaths(Hive.java:2157) org.apache.hadoop.hive.ql.metadata.Hive.copyFiles(Hive.java:2298) org.apache.hadoop.hive.ql.metadata.Table.copyFiles(Table.java:686) org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:1469) org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:243) org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:137) org.apache.spark.sql.execution.Command$class.execute(commands.scala:46) org.apache.spark.sql.hive.execution.InsertIntoHiveTable.execute(InsertIntoHiveTable.scala:51) org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108) org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94) com.nd.home99.LogsProcess$anonfun$main$1$anonfun$apply$1.apply(LogsProcess.scala:286) com.nd.home99.LogsProcess$anonfun$main$1$anonfun$apply$1.apply(LogsProcess.scala:83) scala.collection.immutable.List.foreach(List.scala:318) com.nd.home99.LogsProcess$anonfun$main$1.apply(LogsProcess.scala:83) com.nd.home99.LogsProcess$anonfun$main$1.apply(LogsProcess.scala:82) scala.collection.immutable.List.foreach(List.scala:318) com.nd.home99.LogsProcess$.main(LogsProcess.scala:82) com.nd.home99.LogsProcess.main(LogsProcess.scala) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:601)
Re: how to clean shuffle write each iteration
Shuffle write will be cleaned if it is not referenced by any object directly/indirectly. There is a garbage collector written inside spark which periodically checks for weak references to RDDs/shuffle write/broadcast and deletes them. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-clean-shuffle-write-each-iteration-tp21886p21889.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkSQL, executing an OR
I'm trying to execute a query with Spark. (Example from the Spark Documentation) val teenagers = people.where('age = 10).where('age = 19).select('name) Is it possible to execute an OR with this syntax? val teenagers = people.where('age = 10 'or 'age = 4).where('age = 19).select('name) I have tried different ways and I didn't get it. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: GraphX path traversal
Hi, Could you please let me know how to do this? (or) Any suggestion Regards, Rajesh On Mon, Mar 2, 2015 at 4:47 PM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi, I have a below edge list. How to find the parents path for every vertex? Example : Vertex 1 path : 2, 3, 4, 5, 6 Vertex 2 path : 3, 4, 5, 6 Vertex 3 path : 4,5,6 vertex 4 path : 5,6 vertex 5 path : 6 Could you please let me know how to do this? (or) Any suggestion Source Vertex Destination Vertex 1 2 2 3 3 4 4 5 5 6 Regards, Rajesh
RE: SparkSQL, executing an OR
Using where('age =10 'age =4) instead. -Original Message- From: Guillermo Ortiz [mailto:konstt2...@gmail.com] Sent: Tuesday, March 3, 2015 5:14 PM To: user Subject: SparkSQL, executing an OR I'm trying to execute a query with Spark. (Example from the Spark Documentation) val teenagers = people.where('age = 10).where('age = 19).select('name) Is it possible to execute an OR with this syntax? val teenagers = people.where('age = 10 'or 'age = 4).where('age = 19).select('name) I have tried different ways and I didn't get it. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Supporting Hive features in Spark SQL Thrift JDBC server
Hive UDF are only applicable for HiveContext and its subclass instance, is the CassandraAwareSQLContext a direct sub class of HiveContext or SQLContext? From: shahab [mailto:shahab.mok...@gmail.com] Sent: Tuesday, March 3, 2015 5:10 PM To: Cheng, Hao Cc: user@spark.apache.org Subject: Re: Supporting Hive features in Spark SQL Thrift JDBC server val sc: SparkContext = new SparkContext(conf) val sqlCassContext = new CassandraAwareSQLContext(sc) // I used some Calliope Cassandra Spark connector val rdd : SchemaRDD = sqlCassContext.sql(select * from db.profile ) rdd.cache rdd.registerTempTable(profile) rdd.first //enforce caching val q = select from_unixtime(floor(createdAt/1000)) from profile where sampling_bucket=0 val rdd2 = rdd.sqlContext.sql(q ) println (Result: + rdd2.first) And I get the following errors: xception in thread main org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'from_unixtime('floor(('createdAt / 1000))) AS c0#7, tree: Project ['from_unixtime('floor(('createdAt / 1000))) AS c0#7] Filter (sampling_bucket#10 = 0) Subquery profile Project [company#8,bucket#9,sampling_bucket#10,profileid#11,createdat#12L,modifiedat#13L,version#14] CassandraRelation localhost, 9042, 9160, normaldb_sampling, profile, org.apache.spark.sql.CassandraAwareSQLContext@778b692dmailto:org.apache.spark.sql.CassandraAwareSQLContext@778b692d, None, None, false, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:72) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:183) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.tohttp://class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.tohttp://scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:212) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:168) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:70) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:68) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:402) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:402) at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:403) at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:403) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:407) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:405) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:411) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) at org.apache.spark.sql.SchemaRDD.take(SchemaRDD.scala:440) at
Re: Some questions after playing a little with the new ml.Pipeline.
Here is my current implementation with current master version of spark *class DeepCNNFeature extends Transformer with HasInputCol with HasOutputCol ... { override def transformSchema(...) { ... }* *override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = {* * transformSchema(dataSet.schema, paramMap, logging = true)* * val map = this.paramMap ++ paramMap val deepCNNFeature = udf((v: Vector)= {* * val cnnModel = new CaffeModel * * cnnModel.transform(v)* * } : Vector ) dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol* * }* *}* where CaffeModel is a java api to Caffe C++ model. The problem here is that for every row it will create a new instance of CaffeModel which is inefficient since creating a new model means loading a large model file. And it will transform only a single row at a time. Or a Caffe network can process a batch of rows efficiently. In other words, is it possible to create an UDF that can operatats on a partition in order to minimize the creation of a CaffeModel and to take advantage of the Caffe network batch processing ? On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley jos...@databricks.com wrote: I see, thanks for clarifying! I'd recommend following existing implementations in spark.ml transformers. You'll need to define a UDF which operates on a single Row to compute the value for the new column. You can then use the DataFrame DSL to create the new column; the DSL provides a nice syntax for what would otherwise be a SQL statement like select ... from I'm recommending looking at the existing implementation (rather than stating it here) because it changes between Spark 1.2 and 1.3. In 1.3, the DSL is much improved and makes it easier to create a new column. Joseph On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: class DeepCNNFeature extends Transformer ... { override def transform(data: DataFrame, paramMap: ParamMap): DataFrame = { // How can I do a map partition on the underlying RDD and then add the column ? } } On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi Joseph, Thank your for the tips. I understand what should I do when my data are represented as a RDD. The thing that I can't figure out is how to do the same thing when the data is view as a DataFrame and I need to add the result of my pretrained model as a new column in the DataFrame. Preciselly, I want to implement the following transformer : class DeepCNNFeature extends Transformer ... { } On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley jos...@databricks.com wrote: Hi Jao, You can use external tools and libraries if they can be called from your Spark program or script (with appropriate conversion of data types, etc.). The best way to apply a pre-trained model to a dataset would be to call the model from within a closure, e.g.: myRDD.map { myDatum = preTrainedModel.predict(myDatum) } If your data is distributed in an RDD (myRDD), then the above call will distribute the computation of prediction using the pre-trained model. It will require that all of your Spark workers be able to run the preTrainedModel; that may mean installing Caffe and dependencies on all nodes in the compute cluster. For the second question, I would modify the above call as follows: myRDD.mapPartitions { myDataOnPartition = val myModel = // instantiate neural network on this partition myDataOnPartition.map { myDatum = myModel.predict(myDatum) } } I hope this helps! Joseph On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, We mainly do large scale computer vision task (image classification, retrieval, ...). The pipeline is really great stuff for that. We're trying to reproduce the tutorial given on that topic during the latest spark summit ( http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html ) using the master version of spark pipeline and dataframe. The tutorial shows different examples of feature extraction stages before running machine learning algorithms. Even the tutorial is straightforward to reproduce with this new API, we still have some questions : - Can one use external tools (e.g via pipe) as a pipeline stage ? An example of use case is to extract feature learned with convolutional neural network. In our case, this corresponds to a pre-trained neural network with Caffe library (http://caffe.berkeleyvision.org/) . - The second question is about the performance of the pipeline. Library such as Caffe processes the data in batch and instancing one Caffe network can be time consuming when this network is very deep. So, we can gain performance if we minimize the number of Caffe network creation
LATERAL VIEW explode requests the full schema
I use LATERAL VIEW explode(...) to read data from a parquet-file but the full schema is requeseted by parquet instead just the used columns. When I didn't use LATERAL VIEW the requested schema has just the two columns which I use. Is it correct or is there place for an optimization or do I understand there somthing wrong? Here are my examples: 1) hiveContext.sql(SELECT userid FROM pef WHERE observeddays==20140509) The requested schema is: optional group observedDays (LIST) { repeated int32 array; } required int64 userid; } This is what I expect although the result does not work, but that is not the problem here! 2) hiveContext.sql(SELECT userid FROM pef LATERAL VIEW explode(observeddays) od AS observed WHERE observed==20140509) the requested schema is: required int64 userid; optional int32 source; optional group observedDays (LIST) { repeated int32 array; } optional group placetobe (LIST) { repeated group bag { optional group array { optional binary palces (UTF8); optional group dates (LIST) { repeated int32 array; } } } } } Why does parquet request the full schema. I just use two fields of the table. Can somebody please explain me why this can happen. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/LATERAL-VIEW-explode-requests-the-full-schema-tp21893.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: insert Hive table with RDD
Using the SchemaRDD / DataFrame API via HiveContext Assume you're using the latest code, something probably like: val hc = new HiveContext(sc) import hc.implicits._ existedRdd.toDF().insertInto(hivetable) or existedRdd.toDF().registerTempTable(mydata) hc.sql(insert into hivetable as select xxx from mydata) -Original Message- From: patcharee [mailto:patcharee.thong...@uni.no] Sent: Tuesday, March 3, 2015 7:09 PM To: user@spark.apache.org Subject: insert Hive table with RDD Hi, How can I insert an existing hive table with an RDD containing my data? Any examples? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running Spark jobs via oozie
I am also starting to work on this one. Did you get any solution to this issue? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-jobs-via-oozie-tp5187p21896.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: GraphX path traversal
Hi Robin, Thank you for your response. Please find below my question. I have a below edge file Source Vertex Destination Vertex 1 2 2 3 3 4 4 5 5 6 6 6 In this graph 1st vertex is connected to 2nd vertex, 2nd Vertex is connected to 3rd vertex,. 6th vertex is connected to 6th vertex. So 6th vertex is a root node. Please find below graph [image: Inline image 1] In this graph, How can I compute the 1st vertex parents like 2,3,4,5,6. Similarly 2nd vertex parents like 3,4,5,6 6th vertex parent like 6 because this is the root node. I'm planning to use pergel API but I'm not able to define messages and vertex program in that API. Could you please help me on this. Please let me know if you need more information. Regards, Rajesh On Tue, Mar 3, 2015 at 8:15 PM, Robin East robin.e...@xense.co.uk wrote: Rajesh I'm not sure if I can help you, however I don't even understand the question. Could you restate what you are trying to do. Sent from my iPhone On 2 Mar 2015, at 11:17, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi, I have a below edge list. How to find the parents path for every vertex? Example : Vertex 1 path : 2, 3, 4, 5, 6 Vertex 2 path : 3, 4, 5, 6 Vertex 3 path : 4,5,6 vertex 4 path : 5,6 vertex 5 path : 6 Could you please let me know how to do this? (or) Any suggestion Source Vertex Destination Vertex 1 2 2 3 3 4 4 5 5 6 Regards, Rajesh
Re: On app upgrade, restore sliding window data.
Thank you Arush, I've implemented initial data for a windowed operation and opened a pull request here: https://github.com/apache/spark/pull/4875 On Tue, Feb 24, 2015 at 4:49 AM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: I think this could be of some help to you. https://issues.apache.org/jira/browse/SPARK-3660 On Tue, Feb 24, 2015 at 2:18 AM, Matus Faro matus.f...@kik.com wrote: Hi, Our application is being designed to operate at all times on a large sliding window (day+) of data. The operations performed on the window of data will change fairly frequently and I need a way to save and restore the sliding window after an app upgrade without having to wait the duration of the sliding window to warm up. Because it's an app upgrade, checkpointing will not work unfortunately. I can potentially dump the window to an outside storage periodically or on app shutdown, but I don't have an ideal way of restoring it. I thought about two non-ideal solutions: 1. Load the previous data all at once into the sliding window on app startup. The problem is, at one point I will have double the data in the sliding window until the initial batch of data goes out of scope. 2. Broadcast the previous state of the window separately from the window. Perform the operations on both sets of data until it comes out of scope. The problem is, the data will not fit into memory. Solutions that would solve my problem: 1. Ability to pre-populate sliding window. 2. Have control over batch slicing. It would be nice for a Receiver to dictate the current batch timestamp in order to slow down or fast forward time. Any feedback would be greatly appreciated! Thank you, Matus - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
insert Hive table with RDD
Hi, How can I insert an existing hive table with an RDD containing my data? Any examples? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: gc time too long when using mllib als
You need to increase the parallelism/repartition the data to a higher number to get ride of those. Thanks Best Regards On Tue, Mar 3, 2015 at 2:26 PM, lisendong lisend...@163.com wrote: why does the gc time so long? i 'm using als in mllib, while the garbage collection time is too long (about 1/3 of total time) I have tried some measures in the tunning spark guide, and try to set the new generation memory, but it still does not work... Tasks Task Index Task ID Status Locality Level ExecutorLaunch Time DurationGC TimeResult Ser Time Shuffle ReadWrite Time Shuffle Write Errors 1 2801SUCCESS PROCESS_LOCAL h1.zw 2015/03/03 16:35:15 8.6 min 3.3 min 1238.3 MB 57 ms 69.2 MB 0 2800SUCCESS PROCESS_LOCAL h11.zw 2015/03/03 16:35:15 6.0 min 1.1 min 1261.0 MB 55 ms 68.6 MB 2 2802SUCCESS PROCESS_LOCAL h9.zw 2015/03/03 16:35:15 5.0 min 1.5 min 834.4 MB60 ms 69.6 MB 4 2804SUCCESS PROCESS_LOCAL h4.zw 2015/03/03 16:35:15 4.4 min 59 s689.8 MB 62 ms 71.4 MB 3 2803SUCCESS PROCESS_LOCAL h8.zw 2015/03/03 16:35:15 4.2 min 1.6 min 803.6 MB66 ms 71.5 MB 7 2807SUCCESS PROCESS_LOCAL h6.zw 2015/03/03 16:35:15 4.3 min 1.4 min 733.1 MB9 s 66.5 MB 6 2806SUCCESS PROCESS_LOCAL h10.zw 2015/03/03 16:35:15 6.4 min 3.1 min 950.5 MB68 ms 69.3 MB 5 2805SUCCESS PROCESS_LOCAL h3.zw 2015/03/03 16:35:15 8.0 min 2.7 min 1132.0 MB 64 ms 70.3 MB 8 2808SUCCESS PROCESS_LOCAL h12.zw 2015/03/03 16:35:15 4.5 min 2.2 min 1304.2 MB 60 ms 69.4 MB -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/gc-time-too-long-when-using-mllib-als-tp21891.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD partitions per executor in Cassandra Spark Connector
Hi, is there a paper or a document where one can read how Spark reads Cassandra data in parallel? And how it writes data back from RDDs? Its a bit hard to have a clear picture in mind. Thank you, Pavel Velikhov On Mar 3, 2015, at 1:08 AM, Rumph, Frens Jan m...@frensjan.nl wrote: Hi all, I didn't find the issues button on https://github.com/datastax/spark-cassandra-connector/ https://github.com/datastax/spark-cassandra-connector/ so posting here. Any one have an idea why token ranges are grouped into one partition per executor? I expected at least one per core. Any suggestions on how to work around this? Doing a repartition is way to expensive as I just want more partitions for parallelism, not reshuffle ... Thanks in advance! Frens Jan
Re: One of the executor not getting StopExecutor message
Hi, Operations are not very extensive, as this scenario is not always reproducible. One of the executor start behaving in this manner. For this particular application, we are using 8 cores in one executors, and practically, 4 executors are launched on one machine. This machine has good config with respect to number of cores. Somehow, to me it seems to be some akka communication issue. If i try to take thread dump of the executor, once it appears to be in trouble, then time out happens. Can it be something related to* spark.akka.threads?* On Fri, Feb 27, 2015 at 3:55 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Mostly, that particular executor is stuck on GC Pause, what operation are you performing? You can try increasing the parallelism if you see only 1 executor is doing the task. Thanks Best Regards On Fri, Feb 27, 2015 at 11:39 AM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi, I am running a spark application on Yarn in cluster mode. One of my executor appears to be in hang state, for a long time, and gets finally killed by the driver. As compared to other executors, It have not received StopExecutor message from the driver. Here are the logs at the end of this container (C_1): 15/02/26 18:17:07 DEBUG storage.BlockManagerSlaveActor: Done removing broadcast 36, response is 2 15/02/26 18:17:07 DEBUG storage.BlockManagerSlaveActor: Sent response: 2 to Actor[akka.tcp://sparkDriver@TMO-DN73:37906/temp/$aB] 15/02/26 18:17:09 DEBUG ipc.Client: IPC Client (1206963429) connection to TMO-GCR70/192.168.162.70:9000 from admin: closed 15/02/26 18:17:09 DEBUG ipc.Client: IPC Client (1206963429) connection to TMO-GCR70/192.168.162.70:9000 from admin: stopped, remaining connections 0 15/02/26 18:17:32 DEBUG hdfs.LeaseRenewer: Lease renewer daemon for [] with renew id 1 executed 15/02/26 18:18:00 DEBUG hdfs.LeaseRenewer: Lease renewer daemon for [] with renew id 1 expired 15/02/26 18:18:00 DEBUG hdfs.LeaseRenewer: Lease renewer daemon for [] with renew id 1 exited 15/02/26 20:33:13 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM NOTE that it has no logs for more than 2hrs. Here are the logs at the end of normal container ( C_2): 15/02/26 20:33:09 DEBUG storage.BlockManagerSlaveActor: Sent response: 2 to Actor[akka.tcp://sparkDriver@TMO-DN73:37906/temp/$D+b] 15/02/26 20:33:10 DEBUG executor.CoarseGrainedExecutorBackend: [actor] received message StopExecutor from Actor[akka.tcp://sparkDriver@TMO-DN73 :37906/user/CoarseGrainedScheduler#160899257] 15/02/26 20:33:10 INFO executor.CoarseGrainedExecutorBackend: Driver commanded a shutdown 15/02/26 20:33:10 INFO storage.MemoryStore: MemoryStore cleared 15/02/26 20:33:10 INFO storage.BlockManager: BlockManager stopped 15/02/26 20:33:10 DEBUG executor.CoarseGrainedExecutorBackend: [actor] *handled message (181.499835 ms) StopExecutor* from Actor[akka.tcp://sparkDriver@TMO-DN73 :37906/user/CoarseGrainedScheduler#160899257] 15/02/26 20:33:10 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/02/26 20:33:10 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/02/26 20:33:10 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 15/02/26 20:33:10 DEBUG ipc.Client: stopping client from cache: org.apache.hadoop.ipc.Client@76a68bd4 15/02/26 20:33:10 DEBUG ipc.Client: stopping client from cache: org.apache.hadoop.ipc.Client@76a68bd4 15/02/26 20:33:10 DEBUG ipc.Client: removing client from cache: org.apache.hadoop.ipc.Client@76a68bd4 15/02/26 20:33:10 DEBUG ipc.Client: stopping actual client because no more references remain: org.apache.hadoop.ipc.Client@76a68bd4 15/02/26 20:33:10 DEBUG ipc.Client: Stopping client 15/02/26 20:33:10 DEBUG storage.DiskBlockManager: Shutdown hook called 15/02/26 20:33:10 DEBUG util.Utils: Shutdown hook called At the driver side, i can see the logs related to heartbeat messages from C_1 till 20:05:00 -- 15/02/26 20:05:00 DEBUG spark.HeartbeatReceiver: [actor] received message Heartbeat(7,[Lscala.Tuple2;@151e5ce6,BlockManagerId(7, TMO-DN73, 34106)) from Actor[akka.tcp://sparkExecutor@TMO-DN73:43671/temp/$fn] After this, it continues to receive the heartbeat from other executors except this one, and here follows the message responsible for it's SIGTERM: 15/02/26 20:06:20 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(7, TMO-DN73, 34106) with no recent heart beats: 80515ms exceeds 45000ms I am using spark
delay between removing the block manager of an executor, and marking that as lost
Hi, Is there any relation between removing block manager of an executor and marking that as lost? In my setup,even after removing block manager ( after failing to do some operation )...it is taking more than 20 mins, to mark that as lost executor. Following are the logs: *15/03/03 10:26:49 WARN storage.BlockManagerMaster: Failed to remove broadcast 20 with removeFromMaster = true - Ask timed out on [Actor[akka.tcp://sparkExecutor@TMO-DN73:54363/user/BlockManagerActor1#-966525686]] after [3 ms]}* *15/03/03 10:27:41 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(1, TMO-DN73, 4) with no recent heart beats: 76924ms exceeds 45000ms* *15/03/03 10:27:41 INFO storage.BlockManagerMasterActor: Removing block manager BlockManagerId(1, TMO-DN73, 4)* *15/03/03 10:49:10 ERROR cluster.YarnClusterScheduler: Lost executor 1 on TMO-DN73: remote Akka client disassociated* How can i make this to happen faster? Thanks, Twinkle
Re: Supporting Hive features in Spark SQL Thrift JDBC server
Regarding current_date, I think it is not in either Hive 0.12.0 or 0.13.1 (versions that we support). Seems https://issues.apache.org/jira/browse/HIVE-5472 added it Hive recently. On Tue, Mar 3, 2015 at 6:03 AM, Cheng, Hao hao.ch...@intel.com wrote: The temp table in metastore can not be shared cross SQLContext instances, since HiveContext is a sub class of SQLContext (inherits all of its functionality), why not using a single HiveContext globally? Is there any specific requirement in your case that you need multiple SQLContext/HiveContext? *From:* shahab [mailto:shahab.mok...@gmail.com] *Sent:* Tuesday, March 3, 2015 9:46 PM *To:* Cheng, Hao *Cc:* user@spark.apache.org *Subject:* Re: Supporting Hive features in Spark SQL Thrift JDBC server You are right , CassandraAwareSQLContext is subclass of SQL context. But I did another experiment, I queried Cassandra using CassandraAwareSQLContext, then I registered the rdd as a temp table , next I tried to query it using HiveContext, but it seems that hive context can not see the registered table suing SQL context. Is this a normal case? best, /Shahab On Tue, Mar 3, 2015 at 1:35 PM, Cheng, Hao hao.ch...@intel.com wrote: Hive UDF are only applicable for HiveContext and its subclass instance, is the CassandraAwareSQLContext a direct sub class of HiveContext or SQLContext? *From:* shahab [mailto:shahab.mok...@gmail.com] *Sent:* Tuesday, March 3, 2015 5:10 PM *To:* Cheng, Hao *Cc:* user@spark.apache.org *Subject:* Re: Supporting Hive features in Spark SQL Thrift JDBC server val sc: SparkContext = new SparkContext(conf) val sqlCassContext = new CassandraAwareSQLContext(sc) // I used some Calliope Cassandra Spark connector val rdd : SchemaRDD = sqlCassContext.sql(select * from db.profile ) rdd.cache rdd.registerTempTable(profile) rdd.first //enforce caching val q = select from_unixtime(floor(createdAt/1000)) from profile where sampling_bucket=0 val rdd2 = rdd.sqlContext.sql(q ) println (Result: + rdd2.first) And I get the following errors: xception in thread main org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'from_unixtime('floor(('createdAt / 1000))) AS c0#7, tree: Project ['from_unixtime('floor(('createdAt / 1000))) AS c0#7] Filter (sampling_bucket#10 = 0) Subquery profile Project [company#8,bucket#9,sampling_bucket#10,profileid#11,createdat#12L,modifiedat#13L,version#14] CassandraRelation localhost, 9042, 9160, normaldb_sampling, profile, org.apache.spark.sql.CassandraAwareSQLContext@778b692d, None, None, false, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:72) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:183) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:212) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:168) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:70) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:68) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34) at
Re: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: pyspark on yarn
Hi Sam: Shouldn't you define the table schema? I had the same problem in Scala and then I solved it defining the schema. I did this: sqlContext.applySchema(dataRDD, tableSchema).registerTempTable(tableName) Hope it helps. On Mon, Jan 5, 2015 at 7:01 PM, Sam Flint sam.fl...@magnetic.com wrote: Below is the code that I am running. I get an error for unresolved attributes. Can anyone point me in the right direction? Running from pyspark shell using yarn MASTER=yarn-client pyspark Error is below code: # Import SQLContext and data types from pyspark.sql import * # sc is an existing SparkContext. sqlContext = SQLContext(sc) # The result of loading a parquet file is also a SchemaRDD. # Try loading all data that you have parquetFile = sqlContext.parquetFile(/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.0.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.1.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.10.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.11.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.2.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.3.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.4.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.5.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.6.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.7.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.8.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.9.parq,/user/hive/warehouse/impala_new_4/key=20141001/f1448ca083a5e224-159572f61b50d7a3_854675293_data.0.parq,/user/hive/warehouse/impala_new_4/key=20141001/f1448ca083a5e224-159572f61b50d7a3_854675293_data.1.parq,/user/hive/warehouse/impala_new_4/key=20141001/f1448ca083a5e224-159572f61b50d7a3_854675293_data.2.parq,/user/hive/warehouse/impala_new_4/key=20141001/f1448ca083a5e224-159572f61b50d7a3_854675293_data.3.parq,/user/hive/warehouse/impala_new_4/key=20141001/f1448ca083a5e224-159572f61b50d7a3_854675293_data.4.parq) # Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerTempTable(parquetFileone) results = sqlContext.sql(SELECT * FROM parquetFileone where key=20141001 ) #print results for result in results.collect(): print result Traceback (most recent call last): File stdin, line 1, in module File /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/python/pyspark/sql.py, line 1615, in collect rows = RDD.collect(self) File /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/python/pyspark/rdd.py, line 678, in collect bytesInJava = self._jrdd.collect().iterator() File /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/python/pyspark/sql.py, line 1527, in _jrdd self._lazy_jrdd = self._jschema_rdd.javaToPython() File /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o29.javaToPython. : org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: *, tree: Project [*] Filter ('key = 20141001) Subquery parquetFileone ParquetRelation
Re: PRNG in Scala
And this SO post goes into details on the PRNG in Java http://stackoverflow.com/questions/9907303/does-java-util-random-implementation-differ-between-jres-or-platforms On 3 Mar 2015, at 16:15, Robin East robin.e...@xense.co.uk wrote: This is more of a java/scala question than spark - it uses java.util.Random : https://github.com/scala/scala/blob/2.11.x/src/library/scala/util/Random.scala https://github.com/scala/scala/blob/2.11.x/src/library/scala/util/Random.scala On 3 Mar 2015, at 15:08, Vijayasarathy Kannan kvi...@vt.edu mailto:kvi...@vt.edu wrote: Hi, What pseudo-random-number generator does scala.util.Random uses?
Issue with yarn cluster - hangs in accepted state.
I am trying to run below java class with yarn cluster, but it hangs in accepted state . i don't see any error . Below is the class and command . Any help is appreciated . Thanks, Abhi bin/spark-submit --class com.mycompany.app.SimpleApp --master yarn-cluster /home/hduser/my-app-1.0.jar {code} public class SimpleApp { public static void main(String[] args) { String logFile = /home/hduser/testspark.txt; // Should be some file on your system SparkConf conf = new SparkConf().setAppName(Simple Application); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDString logData = sc.textFile(logFile).cache(); long numAs = logData.filter(new FunctionString, Boolean() { public Boolean call(String s) { return s.contains(a); } }).count(); long numBs = logData.filter(new FunctionString, Boolean() { public Boolean call(String s) { return s.contains(b); } }).count(); System.out.println(Lines with a: + numAs + , lines with b: + numBs); } } {code} 15/03/03 11:47:40 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:41 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:42 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:43 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:44 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:45 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:46 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:47 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:48 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:49 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:50 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:51 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:52 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:53 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:54 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:55 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:56 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:57 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:58 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:59 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:48:00 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:48:01 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:48:02 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:48:03 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:48:04 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED
PRNG in Scala
Hi, What pseudo-random-number generator does scala.util.Random uses?
Re: GraphX path traversal
Hi, I have tried below program using pergel API but I'm not able to get my required output. I'm getting exactly reverse output which I'm expecting. // Creating graph using above mail mentioned edgefile val graph: Graph[Int, Int] = GraphLoader.edgeListFile(sc, /home/rajesh/Downloads/graphdata/data.csv).cache() val parentGraph = Pregel( graph.mapVertices((id, attr) = Set[VertexId]()), Set[VertexId](), Int.MaxValue, EdgeDirection.Out)( (id, attr, msg) = (msg ++ attr), edge = { if (edge.srcId != edge.dstId) { Iterator((edge.dstId, (edge.srcAttr + edge.srcId))) } else Iterator.empty }, (a, b) = (a ++ b)) parentGraph.vertices.collect.foreach(println(_)) *Output :* (4,Set(1, 2, 3)) (1,Set()) (6,Set(5, 1, 2, 3, 4)) (3,Set(1, 2)) (5,Set(1, 2, 3, 4)) (2,Set(1)) *But I'm looking below output. * (4,Set(5, 6)) (1,Set(2, 3, 4, 5, 6)) (6,Set()) (3,Set(4, 5, 6)) (5,Set(6)) (2,Set(3, 4, 5, 6)) Could you please correct me where I'm doing wrong. Regards, Rajesh On Tue, Mar 3, 2015 at 8:42 PM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi Robin, Thank you for your response. Please find below my question. I have a below edge file Source Vertex Destination Vertex 1 2 2 3 3 4 4 5 5 6 6 6 In this graph 1st vertex is connected to 2nd vertex, 2nd Vertex is connected to 3rd vertex,. 6th vertex is connected to 6th vertex. So 6th vertex is a root node. Please find below graph [image: Inline image 1] In this graph, How can I compute the 1st vertex parents like 2,3,4,5,6. Similarly 2nd vertex parents like 3,4,5,6 6th vertex parent like 6 because this is the root node. I'm planning to use pergel API but I'm not able to define messages and vertex program in that API. Could you please help me on this. Please let me know if you need more information. Regards, Rajesh On Tue, Mar 3, 2015 at 8:15 PM, Robin East robin.e...@xense.co.uk wrote: Rajesh I'm not sure if I can help you, however I don't even understand the question. Could you restate what you are trying to do. Sent from my iPhone On 2 Mar 2015, at 11:17, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi, I have a below edge list. How to find the parents path for every vertex? Example : Vertex 1 path : 2, 3, 4, 5, 6 Vertex 2 path : 3, 4, 5, 6 Vertex 3 path : 4,5,6 vertex 4 path : 5,6 vertex 5 path : 6 Could you please let me know how to do this? (or) Any suggestion Source Vertex Destination Vertex 1 2 2 3 3 4 4 5 5 6 Regards, Rajesh
Re: GraphX path traversal
Have you tried EdgeDirection.In? On 3 Mar 2015, at 16:32, Robin East robin.e...@xense.co.uk wrote: What about the following which can be run in spark shell: import org.apache.spark._ import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD val vertexlist = Array((1L,One), (2L,Two), (3L,Three), (4L,Four),(5L,Five),(6L,Six)) val edgelist = Array(Edge(6,5,6 to 5),Edge(5,4,5 to 4),Edge(4,3,4 to 3), Edge(3,2,3 to 2), Edge(2,1,2 to 1)) val vertices: RDD[(VertexId, String)] = sc.parallelize(vertexlist) val edges = sc.parallelize(edgelist) val graph = Graph(vertices, edges) val triplets = graph.triplets triplets.foreach(t = println(sparent for ${t.dstId} is ${t.srcId})) It doesn’t set vertex 6 to have parent 6 but you get the idea. It doesn’t use Pregel but that sounds like overkill for what you are trying to achieve. Does that answer your question or were you after something different? On 3 Mar 2015, at 15:12, Madabhattula Rajesh Kumar mrajaf...@gmail.com mailto:mrajaf...@gmail.com wrote: Hi Robin, Thank you for your response. Please find below my question. I have a below edge file Source VertexDestination Vertex 12 23 34 45 56 66 In this graph 1st vertex is connected to 2nd vertex, 2nd Vertex is connected to 3rd vertex,. 6th vertex is connected to 6th vertex. So 6th vertex is a root node. Please find below graph image.png In this graph, How can I compute the 1st vertex parents like 2,3,4,5,6. Similarly 2nd vertex parents like 3,4,5,6 6th vertex parent like 6 because this is the root node. I'm planning to use pergel API but I'm not able to define messages and vertex program in that API. Could you please help me on this. Please let me know if you need more information. Regards, Rajesh On Tue, Mar 3, 2015 at 8:15 PM, Robin East robin.e...@xense.co.uk mailto:robin.e...@xense.co.uk wrote: Rajesh I'm not sure if I can help you, however I don't even understand the question. Could you restate what you are trying to do. Sent from my iPhone On 2 Mar 2015, at 11:17, Madabhattula Rajesh Kumar mrajaf...@gmail.com mailto:mrajaf...@gmail.com wrote: Hi, I have a below edge list. How to find the parents path for every vertex? Example : Vertex 1 path : 2, 3, 4, 5, 6 Vertex 2 path : 3, 4, 5, 6 Vertex 3 path : 4,5,6 vertex 4 path : 5,6 vertex 5 path : 6 Could you please let me know how to do this? (or) Any suggestion Source Vertex Destination Vertex 1 2 2 3 3 4 4 5 5 6 Regards, Rajesh
Re: Supporting Hive features in Spark SQL Thrift JDBC server
Hello Shahab, I think CassandraAwareHiveContext https://github.com/tuplejump/calliope/blob/develop/sql/hive/src/main/scala/org/apache/spark/sql/hive/CassandraAwareHiveContext.scala in Calliopee is what you are looking for. Create CAHC instance and you should be able to run hive functions against the SchemaRDD you create from there. Cheers, Rohit *Founder CEO, **Tuplejump, Inc.* www.tuplejump.com *The Data Engineering Platform* On Tue, Mar 3, 2015 at 6:03 AM, Cheng, Hao hao.ch...@intel.com wrote: The temp table in metastore can not be shared cross SQLContext instances, since HiveContext is a sub class of SQLContext (inherits all of its functionality), why not using a single HiveContext globally? Is there any specific requirement in your case that you need multiple SQLContext/HiveContext? *From:* shahab [mailto:shahab.mok...@gmail.com] *Sent:* Tuesday, March 3, 2015 9:46 PM *To:* Cheng, Hao *Cc:* user@spark.apache.org *Subject:* Re: Supporting Hive features in Spark SQL Thrift JDBC server You are right , CassandraAwareSQLContext is subclass of SQL context. But I did another experiment, I queried Cassandra using CassandraAwareSQLContext, then I registered the rdd as a temp table , next I tried to query it using HiveContext, but it seems that hive context can not see the registered table suing SQL context. Is this a normal case? best, /Shahab On Tue, Mar 3, 2015 at 1:35 PM, Cheng, Hao hao.ch...@intel.com wrote: Hive UDF are only applicable for HiveContext and its subclass instance, is the CassandraAwareSQLContext a direct sub class of HiveContext or SQLContext? *From:* shahab [mailto:shahab.mok...@gmail.com] *Sent:* Tuesday, March 3, 2015 5:10 PM *To:* Cheng, Hao *Cc:* user@spark.apache.org *Subject:* Re: Supporting Hive features in Spark SQL Thrift JDBC server val sc: SparkContext = new SparkContext(conf) val sqlCassContext = new CassandraAwareSQLContext(sc) // I used some Calliope Cassandra Spark connector val rdd : SchemaRDD = sqlCassContext.sql(select * from db.profile ) rdd.cache rdd.registerTempTable(profile) rdd.first //enforce caching val q = select from_unixtime(floor(createdAt/1000)) from profile where sampling_bucket=0 val rdd2 = rdd.sqlContext.sql(q ) println (Result: + rdd2.first) And I get the following errors: xception in thread main org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'from_unixtime('floor(('createdAt / 1000))) AS c0#7, tree: Project ['from_unixtime('floor(('createdAt / 1000))) AS c0#7] Filter (sampling_bucket#10 = 0) Subquery profile Project [company#8,bucket#9,sampling_bucket#10,profileid#11,createdat#12L,modifiedat#13L,version#14] CassandraRelation localhost, 9042, 9160, normaldb_sampling, profile, org.apache.spark.sql.CassandraAwareSQLContext@778b692d, None, None, false, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:72) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:183) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:212) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:168) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:70) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:68) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) at
Resource manager UI for Spark applications
Hi , I have 2 questions - 1. I was trying to use Resource Manager UI for my SPARK application using yarn cluster mode as I observed that spark UI does not work for Yarn-cluster. IS that correct or am I missing some setup?
Re: PRNG in Scala
This is more of a java/scala question than spark - it uses java.util.Random : https://github.com/scala/scala/blob/2.11.x/src/library/scala/util/Random.scala On 3 Mar 2015, at 15:08, Vijayasarathy Kannan kvi...@vt.edu wrote: Hi, What pseudo-random-number generator does scala.util.Random uses?
Re: GraphX path traversal
What about the following which can be run in spark shell: import org.apache.spark._ import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD val vertexlist = Array((1L,One), (2L,Two), (3L,Three), (4L,Four),(5L,Five),(6L,Six)) val edgelist = Array(Edge(6,5,6 to 5),Edge(5,4,5 to 4),Edge(4,3,4 to 3), Edge(3,2,3 to 2), Edge(2,1,2 to 1)) val vertices: RDD[(VertexId, String)] = sc.parallelize(vertexlist) val edges = sc.parallelize(edgelist) val graph = Graph(vertices, edges) val triplets = graph.triplets triplets.foreach(t = println(sparent for ${t.dstId} is ${t.srcId})) It doesn’t set vertex 6 to have parent 6 but you get the idea. It doesn’t use Pregel but that sounds like overkill for what you are trying to achieve. Does that answer your question or were you after something different? On 3 Mar 2015, at 15:12, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi Robin, Thank you for your response. Please find below my question. I have a below edge file Source Vertex Destination Vertex 1 2 2 3 3 4 4 5 5 6 6 6 In this graph 1st vertex is connected to 2nd vertex, 2nd Vertex is connected to 3rd vertex,. 6th vertex is connected to 6th vertex. So 6th vertex is a root node. Please find below graph image.png In this graph, How can I compute the 1st vertex parents like 2,3,4,5,6. Similarly 2nd vertex parents like 3,4,5,6 6th vertex parent like 6 because this is the root node. I'm planning to use pergel API but I'm not able to define messages and vertex program in that API. Could you please help me on this. Please let me know if you need more information. Regards, Rajesh On Tue, Mar 3, 2015 at 8:15 PM, Robin East robin.e...@xense.co.uk mailto:robin.e...@xense.co.uk wrote: Rajesh I'm not sure if I can help you, however I don't even understand the question. Could you restate what you are trying to do. Sent from my iPhone On 2 Mar 2015, at 11:17, Madabhattula Rajesh Kumar mrajaf...@gmail.com mailto:mrajaf...@gmail.com wrote: Hi, I have a below edge list. How to find the parents path for every vertex? Example : Vertex 1 path : 2, 3, 4, 5, 6 Vertex 2 path : 3, 4, 5, 6 Vertex 3 path : 4,5,6 vertex 4 path : 5,6 vertex 5 path : 6 Could you please let me know how to do this? (or) Any suggestion Source VertexDestination Vertex 12 23 34 45 56 Regards, Rajesh
Re: Supporting Hive features in Spark SQL Thrift JDBC server
@Cheng :My problem is that the connector I use to query Spark does not support latest Hive (0.12, 0.13), But I need to perform Hive Queries on data retrieved from Cassandra. I assumed that if I get data out of cassandra in some way and register it as Temp table I would be able to query it using HiveContext, but it seems I can not do this! @Yes, it is added in Hive 0.12, but do you mean It is not supported by HiveContext in Spark Thanks, /Shahab On Tue, Mar 3, 2015 at 5:23 PM, Yin Huai yh...@databricks.com wrote: Regarding current_date, I think it is not in either Hive 0.12.0 or 0.13.1 (versions that we support). Seems https://issues.apache.org/jira/browse/HIVE-5472 added it Hive recently. On Tue, Mar 3, 2015 at 6:03 AM, Cheng, Hao hao.ch...@intel.com wrote: The temp table in metastore can not be shared cross SQLContext instances, since HiveContext is a sub class of SQLContext (inherits all of its functionality), why not using a single HiveContext globally? Is there any specific requirement in your case that you need multiple SQLContext/HiveContext? *From:* shahab [mailto:shahab.mok...@gmail.com] *Sent:* Tuesday, March 3, 2015 9:46 PM *To:* Cheng, Hao *Cc:* user@spark.apache.org *Subject:* Re: Supporting Hive features in Spark SQL Thrift JDBC server You are right , CassandraAwareSQLContext is subclass of SQL context. But I did another experiment, I queried Cassandra using CassandraAwareSQLContext, then I registered the rdd as a temp table , next I tried to query it using HiveContext, but it seems that hive context can not see the registered table suing SQL context. Is this a normal case? best, /Shahab On Tue, Mar 3, 2015 at 1:35 PM, Cheng, Hao hao.ch...@intel.com wrote: Hive UDF are only applicable for HiveContext and its subclass instance, is the CassandraAwareSQLContext a direct sub class of HiveContext or SQLContext? *From:* shahab [mailto:shahab.mok...@gmail.com] *Sent:* Tuesday, March 3, 2015 5:10 PM *To:* Cheng, Hao *Cc:* user@spark.apache.org *Subject:* Re: Supporting Hive features in Spark SQL Thrift JDBC server val sc: SparkContext = new SparkContext(conf) val sqlCassContext = new CassandraAwareSQLContext(sc) // I used some Calliope Cassandra Spark connector val rdd : SchemaRDD = sqlCassContext.sql(select * from db.profile ) rdd.cache rdd.registerTempTable(profile) rdd.first //enforce caching val q = select from_unixtime(floor(createdAt/1000)) from profile where sampling_bucket=0 val rdd2 = rdd.sqlContext.sql(q ) println (Result: + rdd2.first) And I get the following errors: xception in thread main org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'from_unixtime('floor(('createdAt / 1000))) AS c0#7, tree: Project ['from_unixtime('floor(('createdAt / 1000))) AS c0#7] Filter (sampling_bucket#10 = 0) Subquery profile Project [company#8,bucket#9,sampling_bucket#10,profileid#11,createdat#12L,modifiedat#13L,version#14] CassandraRelation localhost, 9042, 9160, normaldb_sampling, profile, org.apache.spark.sql.CassandraAwareSQLContext@778b692d, None, None, false, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:72) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:183) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:212) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:168) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:70) at
Re: Supporting Hive features in Spark SQL Thrift JDBC server
Thanks Rohit, I am already using Calliope and quite happy with it, well done ! except the fact that : 1- It seems that it does not support Hive 0.12 or higher, Am i right? for example you can not use : current_time() UDF, or those new UDFs added in hive 0.12 . Are they supported? Any plan for supporting them? 2-It does not support Spark 1.1 and 1.2. Any plan for new release? best, /Shahab On Tue, Mar 3, 2015 at 5:41 PM, Rohit Rai ro...@tuplejump.com wrote: Hello Shahab, I think CassandraAwareHiveContext https://github.com/tuplejump/calliope/blob/develop/sql/hive/src/main/scala/org/apache/spark/sql/hive/CassandraAwareHiveContext.scala in Calliopee is what you are looking for. Create CAHC instance and you should be able to run hive functions against the SchemaRDD you create from there. Cheers, Rohit *Founder CEO, **Tuplejump, Inc.* www.tuplejump.com *The Data Engineering Platform* On Tue, Mar 3, 2015 at 6:03 AM, Cheng, Hao hao.ch...@intel.com wrote: The temp table in metastore can not be shared cross SQLContext instances, since HiveContext is a sub class of SQLContext (inherits all of its functionality), why not using a single HiveContext globally? Is there any specific requirement in your case that you need multiple SQLContext/HiveContext? *From:* shahab [mailto:shahab.mok...@gmail.com] *Sent:* Tuesday, March 3, 2015 9:46 PM *To:* Cheng, Hao *Cc:* user@spark.apache.org *Subject:* Re: Supporting Hive features in Spark SQL Thrift JDBC server You are right , CassandraAwareSQLContext is subclass of SQL context. But I did another experiment, I queried Cassandra using CassandraAwareSQLContext, then I registered the rdd as a temp table , next I tried to query it using HiveContext, but it seems that hive context can not see the registered table suing SQL context. Is this a normal case? best, /Shahab On Tue, Mar 3, 2015 at 1:35 PM, Cheng, Hao hao.ch...@intel.com wrote: Hive UDF are only applicable for HiveContext and its subclass instance, is the CassandraAwareSQLContext a direct sub class of HiveContext or SQLContext? *From:* shahab [mailto:shahab.mok...@gmail.com] *Sent:* Tuesday, March 3, 2015 5:10 PM *To:* Cheng, Hao *Cc:* user@spark.apache.org *Subject:* Re: Supporting Hive features in Spark SQL Thrift JDBC server val sc: SparkContext = new SparkContext(conf) val sqlCassContext = new CassandraAwareSQLContext(sc) // I used some Calliope Cassandra Spark connector val rdd : SchemaRDD = sqlCassContext.sql(select * from db.profile ) rdd.cache rdd.registerTempTable(profile) rdd.first //enforce caching val q = select from_unixtime(floor(createdAt/1000)) from profile where sampling_bucket=0 val rdd2 = rdd.sqlContext.sql(q ) println (Result: + rdd2.first) And I get the following errors: xception in thread main org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'from_unixtime('floor(('createdAt / 1000))) AS c0#7, tree: Project ['from_unixtime('floor(('createdAt / 1000))) AS c0#7] Filter (sampling_bucket#10 = 0) Subquery profile Project [company#8,bucket#9,sampling_bucket#10,profileid#11,createdat#12L,modifiedat#13L,version#14] CassandraRelation localhost, 9042, 9160, normaldb_sampling, profile, org.apache.spark.sql.CassandraAwareSQLContext@778b692d, None, None, false, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:72) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:183) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:212) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:168) at
Re: SparkSQL, executing an OR
thanks, it works. 2015-03-03 13:32 GMT+01:00 Cheng, Hao hao.ch...@intel.com: Using where('age =10 'age =4) instead. -Original Message- From: Guillermo Ortiz [mailto:konstt2...@gmail.com] Sent: Tuesday, March 3, 2015 5:14 PM To: user Subject: SparkSQL, executing an OR I'm trying to execute a query with Spark. (Example from the Spark Documentation) val teenagers = people.where('age = 10).where('age = 19).select('name) Is it possible to execute an OR with this syntax? val teenagers = people.where('age = 10 'or 'age = 4).where('age = 19).select('name) I have tried different ways and I didn't get it. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Workaround for spark 1.2.X roaringbitmap kryo problem?
the scala syntax for arrays is Array[T], not T[], so you want to use something: kryo.register(classOf[Array[org.roaringbitmap.RoaringArray$Element]]) kryo.register(classOf[Array[Short]]) nonetheless, the spark should take care of this itself. I'll look into it later today. On Mon, Mar 2, 2015 at 2:55 PM, Arun Luthra arun.lut...@gmail.com wrote: I think this is a Java vs scala syntax issue. Will check. On Thu, Feb 26, 2015 at 8:17 PM, Arun Luthra arun.lut...@gmail.com wrote: Problem is noted here: https://issues.apache.org/jira/browse/SPARK-5949 I tried this as a workaround: import org.apache.spark.scheduler._ import org.roaringbitmap._ ... kryo.register(classOf[org.roaringbitmap.RoaringBitmap]) kryo.register(classOf[org.roaringbitmap.RoaringArray]) kryo.register(classOf[org.roaringbitmap.ArrayContainer]) kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) kryo.register(classOf[org.roaringbitmap.RoaringArray$Element]) kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]]) kryo.register(classOf[short[]]) in build file: libraryDependencies += org.roaringbitmap % RoaringBitmap % 0.4.8 This fails to compile: ...:53: identifier expected but ']' found. [error] kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]]) also: :54: identifier expected but ']' found. [error] kryo.register(classOf[short[]]) also: :51: class HighlyCompressedMapStatus in package scheduler cannot be accessed in package org.apache.spark.scheduler [error] kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) Suggestions? Arun
RE: java.lang.IncompatibleClassChangeError when using PrunedFilteredScan
As the call stack shows, the mongodb connector is not compatible with the Spark SQL Data Source interface. The latest Data Source API is changed since 1.2, probably you need to confirm which spark version the MongoDB Connector build against. By the way, a well format call stack will be more helpful for people reading. From: taoewang [mailto:taoew...@sequoiadb.com] Sent: Tuesday, March 3, 2015 7:39 PM To: user@spark.apache.org Subject: java.lang.IncompatibleClassChangeError when using PrunedFilteredScan Hi, I’m trying to build the stratio spark-mongodb connector and got error java.lang.IncompatibleClassChangeError: class com.stratio.deep.mongodb.MongodbRelation has interface org.apache.spark.sql.sources.PrunedFilteredScan as super class” when trying to create a table using the driver: scala import org.apache.spark.sql.SQLContext val sqlContext = new SQLContext(sc)import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SQLContext scala val sqlContext = new SQLContext(sc) sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@37050c15mailto:org.apache.spark.sql.SQLContext@37050c15 scala import com.stratio.deep.mongodb import com.stratio.deep.mongodb import com.stratio.deep.mongodb scala sqlContext.sql(CREATE TEMPORARY TABLE students_table USING com.stratio.deep.mongodb OPTIONS (host 'host:port', database 'highschool', collection 'students')) sqlContext.sql(CREATE TEMPORARY TABLE students_table USING com.stratio.d eep.mongodb OPTIONS (host 'host:port', database 'highschool', collection 'studen ts')) java.lang.IncompatibleClassChangeError: class com.stratio.deep.mongodb.MongodbRelation has interface org.apache.spark.sql.sources.PrunedFilteredScan as super class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.getDeclaredConstructors0(Native Method) at java.lang.Class.privateGetDeclaredConstructors(Class.java:2585) at java.lang.Class.getConstructor0(Class.java:2885) at java.lang.Class.newInstance(Class.java:350) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:288) at org.apache.spark.sql.sources.CreateTempTableUsing.run(ddl.scala:376) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:55) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:55) The code failed at line 288 in ddl.scala: def apply( sqlContext: SQLContext, userSpecifiedSchema: Option[StructType], provider: String, options: Map[String, String]): ResolvedDataSource = { val clazz: Class[_] = lookupDataSource(provider) val relation = userSpecifiedSchema match { case Some(schema: StructType) = clazz.newInstance() match { case dataSource: SchemaRelationProvider = dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema) case dataSource: org.apache.spark.sql.sources.RelationProvider = sys.error(s${clazz.getCanonicalName} does not allow user-specified schemas.) } case None = clazz.newInstance() match { ——— failed here case dataSource: RelationProvider = dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options)) case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider = sys.error(sA schema needs to be specified when using ${clazz.getCanonicalName}.) } } new ResolvedDataSource(clazz, relation) } The “clazz” here is com.stratio.deep.mongodb.DefaultSource, which extends RelationProvider: class DefaultSource extends RelationProvider { override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { /** We will assume hosts are provided like 'host:port,host2:port2,...'*/ val host = parameters .getOrElse(Host, notFound[String](Host)) .split(,).toList val database = parameters.getOrElse(Database, notFound(Database)) val collection = parameters.getOrElse(Collection, notFound(Collection)) val samplingRatio = parameters .get(SamplingRatio) .map(_.toDouble).getOrElse(DefaultSamplingRatio) MongodbRelation( MongodbConfigBuilder() .set(Host,host) .set(Database,database)
Re: Supporting Hive features in Spark SQL Thrift JDBC server
You are right , CassandraAwareSQLContext is subclass of SQL context. But I did another experiment, I queried Cassandra using CassandraAwareSQLContext, then I registered the rdd as a temp table , next I tried to query it using HiveContext, but it seems that hive context can not see the registered table suing SQL context. Is this a normal case? best, /Shahab On Tue, Mar 3, 2015 at 1:35 PM, Cheng, Hao hao.ch...@intel.com wrote: Hive UDF are only applicable for HiveContext and its subclass instance, is the CassandraAwareSQLContext a direct sub class of HiveContext or SQLContext? *From:* shahab [mailto:shahab.mok...@gmail.com] *Sent:* Tuesday, March 3, 2015 5:10 PM *To:* Cheng, Hao *Cc:* user@spark.apache.org *Subject:* Re: Supporting Hive features in Spark SQL Thrift JDBC server val sc: SparkContext = new SparkContext(conf) val sqlCassContext = new CassandraAwareSQLContext(sc) // I used some Calliope Cassandra Spark connector val rdd : SchemaRDD = sqlCassContext.sql(select * from db.profile ) rdd.cache rdd.registerTempTable(profile) rdd.first //enforce caching val q = select from_unixtime(floor(createdAt/1000)) from profile where sampling_bucket=0 val rdd2 = rdd.sqlContext.sql(q ) println (Result: + rdd2.first) And I get the following errors: xception in thread main org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'from_unixtime('floor(('createdAt / 1000))) AS c0#7, tree: Project ['from_unixtime('floor(('createdAt / 1000))) AS c0#7] Filter (sampling_bucket#10 = 0) Subquery profile Project [company#8,bucket#9,sampling_bucket#10,profileid#11,createdat#12L,modifiedat#13L,version#14] CassandraRelation localhost, 9042, 9160, normaldb_sampling, profile, org.apache.spark.sql.CassandraAwareSQLContext@778b692d, None, None, false, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:72) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:183) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:212) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:168) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:70) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:68) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:402) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:402) at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:403) at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:403) at
spark.local.dir leads to Job cancelled because SparkContext was shut down
As long as I set the spark.local.dir to multiple disks, the job will failed, the errors are as follow: (if I set the spark.local.dir to only 1 dir, the job will succed...) Exception in thread main org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:639) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:638) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:638) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1215) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163) at akka.actor.ActorCell.terminate(ActorCell.scala:338) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:240) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-local-dir-leads-to-Job-cancelled-because-SparkContext-was-shut-down-tp21894.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[no subject]
I did an experiment with Hive and SQL context , I queried Cassandra using CassandraAwareSQLContext (a custom SQL context from Calliope) , then I registered the rdd as a temp table , next I tried to query it using HiveContext, but it seems that hive context can not see the registered table suing SQL context. Is this a normal case? Stack trace: ERROR hive.ql.metadata.Hive - NoSuchObjectException(message:default.MyTableName table not found) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table( HiveMetaStore.java:1373) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke( NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke( DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke( RetryingHMSHandler.java:103) best, /Shahab
Re: Resource manager UI for Spark applications
Sorry , for half email - here it is again in full Hi , I have 2 questions - 1. I was trying to use Resource Manager UI for my SPARK application using yarn cluster mode as I observed that spark UI does not work for Yarn-cluster. IS that correct or am I missing some setup? 2. when I click on Application Monitoring or history , i get re-directed to some linked with internal Ip address. Even if I replace that address with the public IP , it still does not work. What kind of setup changes are needed for that? Thanks -roni On Tue, Mar 3, 2015 at 8:45 AM, Rohini joshi roni.epi...@gmail.com wrote: Hi , I have 2 questions - 1. I was trying to use Resource Manager UI for my SPARK application using yarn cluster mode as I observed that spark UI does not work for Yarn-cluster. IS that correct or am I missing some setup?
Solve least square problem of the form min norm(A x - b)^2^ + lambda * n * norm(x)^2 ?
Dear all, Is there a least square solver based on DistributedMatrix that we can use out of the box in the current (or the master) version of spark ? It seems that the only least square solver available in spark is private to recommender package. Cheers, Jao
Re: Supporting Hive features in Spark SQL Thrift JDBC server
The Hive dependency comes from spark-hive. It does work with Spark 1.1 we will have the 1.2 release later this month. On Mar 3, 2015 8:49 AM, shahab shahab.mok...@gmail.com wrote: Thanks Rohit, I am already using Calliope and quite happy with it, well done ! except the fact that : 1- It seems that it does not support Hive 0.12 or higher, Am i right? for example you can not use : current_time() UDF, or those new UDFs added in hive 0.12 . Are they supported? Any plan for supporting them? 2-It does not support Spark 1.1 and 1.2. Any plan for new release? best, /Shahab On Tue, Mar 3, 2015 at 5:41 PM, Rohit Rai ro...@tuplejump.com wrote: Hello Shahab, I think CassandraAwareHiveContext https://github.com/tuplejump/calliope/blob/develop/sql/hive/src/main/scala/org/apache/spark/sql/hive/CassandraAwareHiveContext.scala in Calliopee is what you are looking for. Create CAHC instance and you should be able to run hive functions against the SchemaRDD you create from there. Cheers, Rohit *Founder CEO, **Tuplejump, Inc.* www.tuplejump.com *The Data Engineering Platform* On Tue, Mar 3, 2015 at 6:03 AM, Cheng, Hao hao.ch...@intel.com wrote: The temp table in metastore can not be shared cross SQLContext instances, since HiveContext is a sub class of SQLContext (inherits all of its functionality), why not using a single HiveContext globally? Is there any specific requirement in your case that you need multiple SQLContext/HiveContext? *From:* shahab [mailto:shahab.mok...@gmail.com] *Sent:* Tuesday, March 3, 2015 9:46 PM *To:* Cheng, Hao *Cc:* user@spark.apache.org *Subject:* Re: Supporting Hive features in Spark SQL Thrift JDBC server You are right , CassandraAwareSQLContext is subclass of SQL context. But I did another experiment, I queried Cassandra using CassandraAwareSQLContext, then I registered the rdd as a temp table , next I tried to query it using HiveContext, but it seems that hive context can not see the registered table suing SQL context. Is this a normal case? best, /Shahab On Tue, Mar 3, 2015 at 1:35 PM, Cheng, Hao hao.ch...@intel.com wrote: Hive UDF are only applicable for HiveContext and its subclass instance, is the CassandraAwareSQLContext a direct sub class of HiveContext or SQLContext? *From:* shahab [mailto:shahab.mok...@gmail.com] *Sent:* Tuesday, March 3, 2015 5:10 PM *To:* Cheng, Hao *Cc:* user@spark.apache.org *Subject:* Re: Supporting Hive features in Spark SQL Thrift JDBC server val sc: SparkContext = new SparkContext(conf) val sqlCassContext = new CassandraAwareSQLContext(sc) // I used some Calliope Cassandra Spark connector val rdd : SchemaRDD = sqlCassContext.sql(select * from db.profile ) rdd.cache rdd.registerTempTable(profile) rdd.first //enforce caching val q = select from_unixtime(floor(createdAt/1000)) from profile where sampling_bucket=0 val rdd2 = rdd.sqlContext.sql(q ) println (Result: + rdd2.first) And I get the following errors: xception in thread main org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'from_unixtime('floor(('createdAt / 1000))) AS c0#7, tree: Project ['from_unixtime('floor(('createdAt / 1000))) AS c0#7] Filter (sampling_bucket#10 = 0) Subquery profile Project [company#8,bucket#9,sampling_bucket#10,profileid#11,createdat#12L,modifiedat#13L,version#14] CassandraRelation localhost, 9042, 9160, normaldb_sampling, profile, org.apache.spark.sql.CassandraAwareSQLContext@778b692d, None, None, false, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:72) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:183) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at
Re: Spark Error: Cause was: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@localhost:7077
Hello Ted, Some progress, now it seems that the spark job does get submitted, In the spark web UI, I do see this under finished drivers. However, it seems to not go past this step, JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jsc, localhost:2181, aa, topicMap); I do have several messages in the Kafka topic. When I run the consumer with from-beginning option, I do see all the messages. krishs-mbp:bin hadoop$ ./spark-submit --jars /Users/hadoop/kafka-0.8.1.1-src/core/build/libs/kafka_2.8.0-0.8.1.1.jar --class KafkaMain --master spark://krishs-mbp:7077 --deploy-mode cluster --num-executors 100 --driver-memory 1g --executor-memory 1g --supervise file:///Users/hadoop/dev/kafkahbase/kafka/src/sparkhbase.jar Sending launch command to spark://krishs-mbp:7077 Driver successfully submitted as driver-20150303085115-0007 ... waiting before polling master for driver state ... polling master for driver state State of driver-20150303085115-0007 is FINISHED On Monday, March 2, 2015 4:31 PM, Ted Yu yuzhih...@gmail.com wrote: In AkkaUtils.scala: val akkaLogLifecycleEvents = conf.getBoolean(spark.akka.logLifecycleEvents, false) Can you turn on life cycle event logging to see if you would get some more clue ? Cheers On Mon, Mar 2, 2015 at 3:56 PM, Krishnanand Khambadkone kkhambadk...@yahoo.com wrote: I see these messages now, spark.master - spark://krishs-mbp:7077 Classpath elements: Sending launch command to spark://krishs-mbp:7077 Driver successfully submitted as driver-20150302155433- ... waiting before polling master for driver state ... polling master for driver state State of driver-20150302155433- is FAILED On Monday, March 2, 2015 2:42 PM, Ted Yu yuzhih...@gmail.com wrote: bq. Cause was: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@localhost:7077 There should be some more output following the above line. Can you post them ? Cheers On Mon, Mar 2, 2015 at 2:06 PM, Krishnanand Khambadkone kkhambadk...@yahoo.com.invalid wrote: Hi, I am running spark on my mac. It is reading from a kafka topic and then writes the data to a hbase table. When I do a spark submit, I get this error, Error connecting to master spark://localhost:7077 (akka.tcp://sparkMaster@localhost:7077), exiting. Cause was: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@localhost:7077 My submit statement looks like this, ./spark-submit --jars /Users/hadoop/kafka-0.8.1.1-src/core/build/libs/kafka_2.8.0-0.8.1.1.jar --class KafkaMain --master spark://localhost:7077 --deploy-mode cluster --num-executors 100 --driver-memory 1g --executor-memory 1g /Users/hadoop/dev/kafkahbase/kafka/src/sparkhbase.jar
Re: Resource manager UI for Spark applications
bq. spark UI does not work for Yarn-cluster. Can you be a bit more specific on the error(s) you saw ? What Spark release are you using ? Cheers On Tue, Mar 3, 2015 at 8:53 AM, Rohini joshi roni.epi...@gmail.com wrote: Sorry , for half email - here it is again in full Hi , I have 2 questions - 1. I was trying to use Resource Manager UI for my SPARK application using yarn cluster mode as I observed that spark UI does not work for Yarn-cluster. IS that correct or am I missing some setup? 2. when I click on Application Monitoring or history , i get re-directed to some linked with internal Ip address. Even if I replace that address with the public IP , it still does not work. What kind of setup changes are needed for that? Thanks -roni On Tue, Mar 3, 2015 at 8:45 AM, Rohini joshi roni.epi...@gmail.com wrote: Hi , I have 2 questions - 1. I was trying to use Resource Manager UI for my SPARK application using yarn cluster mode as I observed that spark UI does not work for Yarn-cluster. IS that correct or am I missing some setup?
[no subject]
Hi, I got this error message: 15/03/03 10:22:41 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) And then for the same index file and executor, I got the following errors multiple times 15/03/03 10:22:41 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) from host-:39534 java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) 15/03/03 10:22:41 ERROR RetryingBlockFetcher: Failed to fetch block shuffle_0_13_1228, and will not retry (0 retries) java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) ... Caused by: java.net.ConnectException: Connection refused: host- What's the problem? BTW, I'm using Spark 1.2.1-SNAPSHOT I built around Dec. 20. Is there any bug fixes related to shuffle block fetching or index files after that? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Spark Monitoring UI for Hadoop Yarn Cluster
Hi Srini, If you start the $SPARK_HOME/sbin/start-history-server, you should be able to see the basic spark ui. You will not see the master, but you will be able to see the rest as I recall. You also need to add an entry into the spark-defaults.conf, something like this: *## Make sure the host and port match the node where your YARN history server is running* spark.yarn.historyServer.address localhost:18080 HTH. -Todd On Tue, Mar 3, 2015 at 12:47 PM, Srini Karri skarri@gmail.com wrote: Hi All, I am having trouble finding data related to my requirement. Here is the context, I have tried Standalone Spark Installation on Windows, I am able to submit the logs, able to see the history of events. My question is, is it possible to achieve the same monitoring UI experience with Yarn Cluster like Viewing workers, running/completed job stages in the Web UI. Currently, if we go to our Yarn Resource manager UI, we are able to see the Spark Jobs and it's logs. But it is not as rich as Spark Standalone master UI. Is this limitation for hadoop yarn cluster or is there any way we can hook this Spark Standalone master to Yarn Cluster? Any help is highly appreciated. Regards, Srini.
Re: Resource manager UI for Spark applications
bq. changing the address with internal to the external one , but still does not work. Not sure what happened. For the time being, you can use yarn command line to pull container log (put in your appId and container Id): yarn logs -applicationId application_1386639398517_0007 -containerId container_1386639398517_0007_01_19 Cheers On Tue, Mar 3, 2015 at 9:50 AM, roni roni.epi...@gmail.com wrote: Hi Ted, I used s3://support.elasticmapreduce/spark/install-spark to install spark on my EMR cluster. It is 1.2.0. When I click on the link for history or logs it takes me to http://ip-172-31-43-116.us-west-2.compute.internal:9035/node/containerlogs/container_1424105590052_0070_01_01/hadoop and I get - The server at *ip-172-31-43-116.us-west-2.compute.internal* can't be found, because the DNS lookup failed. DNS is the network service that translates a website's name to its Internet address. This error is most often caused by having no connection to the Internet or a misconfigured network. It can also be caused by an unresponsive DNS server or a firewall preventing Google Chrome from accessing the network. I tried changing the address with internal to the external one , but still does not work. Thanks _roni On Tue, Mar 3, 2015 at 9:05 AM, Ted Yu yuzhih...@gmail.com wrote: bq. spark UI does not work for Yarn-cluster. Can you be a bit more specific on the error(s) you saw ? What Spark release are you using ? Cheers On Tue, Mar 3, 2015 at 8:53 AM, Rohini joshi roni.epi...@gmail.com wrote: Sorry , for half email - here it is again in full Hi , I have 2 questions - 1. I was trying to use Resource Manager UI for my SPARK application using yarn cluster mode as I observed that spark UI does not work for Yarn-cluster. IS that correct or am I missing some setup? 2. when I click on Application Monitoring or history , i get re-directed to some linked with internal Ip address. Even if I replace that address with the public IP , it still does not work. What kind of setup changes are needed for that? Thanks -roni On Tue, Mar 3, 2015 at 8:45 AM, Rohini joshi roni.epi...@gmail.com wrote: Hi , I have 2 questions - 1. I was trying to use Resource Manager UI for my SPARK application using yarn cluster mode as I observed that spark UI does not work for Yarn-cluster. IS that correct or am I missing some setup?
Why different numbers of partitions give different results for the same computation on the same dataset?
Hi, I have a spark streaming application, running on a single node, consisting mainly of map operations. I perform repartitioning to control the number of CPU cores that I want to use. The code goes like this: val ssc = new StreamingContext(sparkConf, Seconds(5)) val distFile = ssc.textFileStream(/home/myuser/spark-example/dump) val words = distFile.repartition(cores.toInt).flatMap(_.split( )) .filter(_.length 3) val wordCharValues = words.map(word = { var sum = 0 word.toCharArray.foreach(c = {sum += c.toInt}) sum.toDouble / word.length.toDouble }).foreachRDD(rdd = { println(MEAN: + rdd.mean()) }) I have 2 questions: 1) How can I use coalesce in this code instead of repartition? 2) Why, using the same dataset (which is a small file processed within a single batch), the result that I obtain for the mean varies with the number of partitions? If I don't call the repartition method, the result is always the same for every execution, as it should be. But repartitioning for instance in 2 partitions gives a different mean value than using 8 partitions. I really don't understand why given that my code is deterministic. Can someone enlighten me on this? Thanks.
Re: Resource manager UI for Spark applications
Hi Ted, I used s3://support.elasticmapreduce/spark/install-spark to install spark on my EMR cluster. It is 1.2.0. When I click on the link for history or logs it takes me to http://ip-172-31-43-116.us-west-2.compute.internal:9035/node/containerlogs/container_1424105590052_0070_01_01/hadoop and I get - The server at *ip-172-31-43-116.us-west-2.compute.internal* can't be found, because the DNS lookup failed. DNS is the network service that translates a website's name to its Internet address. This error is most often caused by having no connection to the Internet or a misconfigured network. It can also be caused by an unresponsive DNS server or a firewall preventing Google Chrome from accessing the network. I tried changing the address with internal to the external one , but still does not work. Thanks _roni On Tue, Mar 3, 2015 at 9:05 AM, Ted Yu yuzhih...@gmail.com wrote: bq. spark UI does not work for Yarn-cluster. Can you be a bit more specific on the error(s) you saw ? What Spark release are you using ? Cheers On Tue, Mar 3, 2015 at 8:53 AM, Rohini joshi roni.epi...@gmail.com wrote: Sorry , for half email - here it is again in full Hi , I have 2 questions - 1. I was trying to use Resource Manager UI for my SPARK application using yarn cluster mode as I observed that spark UI does not work for Yarn-cluster. IS that correct or am I missing some setup? 2. when I click on Application Monitoring or history , i get re-directed to some linked with internal Ip address. Even if I replace that address with the public IP , it still does not work. What kind of setup changes are needed for that? Thanks -roni On Tue, Mar 3, 2015 at 8:45 AM, Rohini joshi roni.epi...@gmail.com wrote: Hi , I have 2 questions - 1. I was trying to use Resource Manager UI for my SPARK application using yarn cluster mode as I observed that spark UI does not work for Yarn-cluster. IS that correct or am I missing some setup?
Re: Having lots of FetchFailedException in join
Failed to connect implies that the executor at that host died, please check its logs as well. On Tue, Mar 3, 2015 at 11:03 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Sorry that I forgot the subject. And in the driver, I got many FetchFailedException. The error messages are 15/03/03 10:34:32 WARN TaskSetManager: Lost task 31.0 in stage 2.2 (TID 7943, ): FetchFailed(BlockManagerId(86, , 43070), shuffleId=0, mapId=24, reduceId=1220, message= org.apache.spark.shuffle.FetchFailedException: Failed to connect to /:43070 at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) Jianshi On Wed, Mar 4, 2015 at 2:55 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I got this error message: 15/03/03 10:22:41 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) And then for the same index file and executor, I got the following errors multiple times 15/03/03 10:22:41 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) from host-:39534 java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) 15/03/03 10:22:41 ERROR RetryingBlockFetcher: Failed to fetch block shuffle_0_13_1228, and will not retry (0 retries) java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) ... Caused by: java.net.ConnectException: Connection refused: host- What's the problem? BTW, I'm using Spark 1.2.1-SNAPSHOT I built around Dec. 20. Is there any bug fixes related to shuffle block fetching or index files after that? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Spark Monitoring UI for Hadoop Yarn Cluster
Spark applications shown in the RM's UI should have an Application Master link when they're running. That takes you to the Spark UI for that application where you can see all the information you're looking for. If you're running a history server and add spark.yarn.historyServer.address to your config, that link will become a History link after the application is finished, and will take you to the history server to view the app's UI. On Tue, Mar 3, 2015 at 9:47 AM, Srini Karri skarri@gmail.com wrote: Hi All, I am having trouble finding data related to my requirement. Here is the context, I have tried Standalone Spark Installation on Windows, I am able to submit the logs, able to see the history of events. My question is, is it possible to achieve the same monitoring UI experience with Yarn Cluster like Viewing workers, running/completed job stages in the Web UI. Currently, if we go to our Yarn Resource manager UI, we are able to see the Spark Jobs and it's logs. But it is not as rich as Spark Standalone master UI. Is this limitation for hadoop yarn cluster or is there any way we can hook this Spark Standalone master to Yarn Cluster? Any help is highly appreciated. Regards, Srini. -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issue using S3 bucket from Spark 1.2.1 with hadoop 2.4
If you can use hadoop 2.6.0 binary, you can use s3a s3a is being polished in the upcoming 2.7.0 release: https://issues.apache.org/jira/browse/HADOOP-11571 Cheers On Tue, Mar 3, 2015 at 9:44 AM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hi, We recently upgraded to Spark 1.2.1 - Hadoop 2.4 binary. We are not having any other dependency on hadoop jars, except for reading our source files from S3. Since we have upgraded to the latest version our reads from S3 have considerably slowed down. For some jobs we see the read from S3 is stalled for a long time and then it starts. Is there a known issue with S3 or do we need to upgrade any settings? The only settings that we are using are: sc.hadoopConfiguration().set(fs.s3n.impl, org.apache.hadoop.fs.s3native.NativeS3FileSystem); sc.hadoopConfiguration().set(fs.s3n.awsAccessKeyId, someKey); sc.hadoopConfiguration().set(fs.s3n.awsSecretAccessKey, someSecret); Thanks for help!! - Ankur
Re: Issue using S3 bucket from Spark 1.2.1 with hadoop 2.4
Thanks a lot Ted!! On Tue, Mar 3, 2015 at 9:53 AM, Ted Yu yuzhih...@gmail.com wrote: If you can use hadoop 2.6.0 binary, you can use s3a s3a is being polished in the upcoming 2.7.0 release: https://issues.apache.org/jira/browse/HADOOP-11571 Cheers On Tue, Mar 3, 2015 at 9:44 AM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hi, We recently upgraded to Spark 1.2.1 - Hadoop 2.4 binary. We are not having any other dependency on hadoop jars, except for reading our source files from S3. Since we have upgraded to the latest version our reads from S3 have considerably slowed down. For some jobs we see the read from S3 is stalled for a long time and then it starts. Is there a known issue with S3 or do we need to upgrade any settings? The only settings that we are using are: sc.hadoopConfiguration().set(fs.s3n.impl, org.apache.hadoop.fs.s3native.NativeS3FileSystem); sc.hadoopConfiguration().set(fs.s3n.awsAccessKeyId, someKey); sc.hadoopConfiguration().set(fs.s3n.awsSecretAccessKey, someSecret); Thanks for help!! - Ankur
Re: Solve least square problem of the form min norm(A x - b)^2^ + lambda * n * norm(x)^2 ?
There are couple of solvers that I've written that is part of the AMPLab ml-matrix repo [1,2]. These aren't part of MLLib yet though and if you are interested in porting them I'd be happy to review it Thanks Shivaram [1] https://github.com/amplab/ml-matrix/blob/master/src/main/scala/edu/berkeley/cs/amplab/mlmatrix/TSQR.scala [2] https://github.com/amplab/ml-matrix/blob/master/src/main/scala/edu/berkeley/cs/amplab/mlmatrix/NormalEquations.scala On Tue, Mar 3, 2015 at 9:01 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, Is there a least square solver based on DistributedMatrix that we can use out of the box in the current (or the master) version of spark ? It seems that the only least square solver available in spark is private to recommender package. Cheers, Jao
Re: Having lots of FetchFailedException in join
Sorry that I forgot the subject. And in the driver, I got many FetchFailedException. The error messages are 15/03/03 10:34:32 WARN TaskSetManager: Lost task 31.0 in stage 2.2 (TID 7943, ): FetchFailed(BlockManagerId(86, , 43070), shuffleId=0, mapId=24, reduceId=1220, message= org.apache.spark.shuffle.FetchFailedException: Failed to connect to /:43070 at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) Jianshi On Wed, Mar 4, 2015 at 2:55 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I got this error message: 15/03/03 10:22:41 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) And then for the same index file and executor, I got the following errors multiple times 15/03/03 10:22:41 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) from host-:39534 java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) 15/03/03 10:22:41 ERROR RetryingBlockFetcher: Failed to fetch block shuffle_0_13_1228, and will not retry (0 retries) java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) ... Caused by: java.net.ConnectException: Connection refused: host- What's the problem? BTW, I'm using Spark 1.2.1-SNAPSHOT I built around Dec. 20. Is there any bug fixes related to shuffle block fetching or index files after that? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Supporting Hive features in Spark SQL Thrift JDBC server
@Yin: sorry for my mistake, you are right it was added in 1.2, not 0.12.0 , my bad! On Tue, Mar 3, 2015 at 6:47 PM, shahab shahab.mok...@gmail.com wrote: Thanks Rohit, yes my mistake, it does work with 1.1 ( I am actually running it on spark 1.1) But do you mean that even HiveConext of spark (nit Calliope CassandraAwareHiveContext) is not supporting Hive 0.12 ?? best, /Shahab On Tue, Mar 3, 2015 at 5:55 PM, Rohit Rai ro...@tuplejump.com wrote: The Hive dependency comes from spark-hive. It does work with Spark 1.1 we will have the 1.2 release later this month. On Mar 3, 2015 8:49 AM, shahab shahab.mok...@gmail.com wrote: Thanks Rohit, I am already using Calliope and quite happy with it, well done ! except the fact that : 1- It seems that it does not support Hive 0.12 or higher, Am i right? for example you can not use : current_time() UDF, or those new UDFs added in hive 0.12 . Are they supported? Any plan for supporting them? 2-It does not support Spark 1.1 and 1.2. Any plan for new release? best, /Shahab On Tue, Mar 3, 2015 at 5:41 PM, Rohit Rai ro...@tuplejump.com wrote: Hello Shahab, I think CassandraAwareHiveContext https://github.com/tuplejump/calliope/blob/develop/sql/hive/src/main/scala/org/apache/spark/sql/hive/CassandraAwareHiveContext.scala in Calliopee is what you are looking for. Create CAHC instance and you should be able to run hive functions against the SchemaRDD you create from there. Cheers, Rohit *Founder CEO, **Tuplejump, Inc.* www.tuplejump.com *The Data Engineering Platform* On Tue, Mar 3, 2015 at 6:03 AM, Cheng, Hao hao.ch...@intel.com wrote: The temp table in metastore can not be shared cross SQLContext instances, since HiveContext is a sub class of SQLContext (inherits all of its functionality), why not using a single HiveContext globally? Is there any specific requirement in your case that you need multiple SQLContext/HiveContext? *From:* shahab [mailto:shahab.mok...@gmail.com] *Sent:* Tuesday, March 3, 2015 9:46 PM *To:* Cheng, Hao *Cc:* user@spark.apache.org *Subject:* Re: Supporting Hive features in Spark SQL Thrift JDBC server You are right , CassandraAwareSQLContext is subclass of SQL context. But I did another experiment, I queried Cassandra using CassandraAwareSQLContext, then I registered the rdd as a temp table , next I tried to query it using HiveContext, but it seems that hive context can not see the registered table suing SQL context. Is this a normal case? best, /Shahab On Tue, Mar 3, 2015 at 1:35 PM, Cheng, Hao hao.ch...@intel.com wrote: Hive UDF are only applicable for HiveContext and its subclass instance, is the CassandraAwareSQLContext a direct sub class of HiveContext or SQLContext? *From:* shahab [mailto:shahab.mok...@gmail.com] *Sent:* Tuesday, March 3, 2015 5:10 PM *To:* Cheng, Hao *Cc:* user@spark.apache.org *Subject:* Re: Supporting Hive features in Spark SQL Thrift JDBC server val sc: SparkContext = new SparkContext(conf) val sqlCassContext = new CassandraAwareSQLContext(sc) // I used some Calliope Cassandra Spark connector val rdd : SchemaRDD = sqlCassContext.sql(select * from db.profile ) rdd.cache rdd.registerTempTable(profile) rdd.first //enforce caching val q = select from_unixtime(floor(createdAt/1000)) from profile where sampling_bucket=0 val rdd2 = rdd.sqlContext.sql(q ) println (Result: + rdd2.first) And I get the following errors: xception in thread main org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'from_unixtime('floor(('createdAt / 1000))) AS c0#7, tree: Project ['from_unixtime('floor(('createdAt / 1000))) AS c0#7] Filter (sampling_bucket#10 = 0) Subquery profile Project [company#8,bucket#9,sampling_bucket#10,profileid#11,createdat#12L,modifiedat#13L,version#14] CassandraRelation localhost, 9042, 9160, normaldb_sampling, profile, org.apache.spark.sql.CassandraAwareSQLContext@778b692d, None, None, false, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:72) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:183) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at
UnsatisfiedLinkError related to libgfortran when running MLLIB code on RHEL 5.8
Hi Folks, We are trying to run the following code from the spark shell in a CDH 5.3 cluster running on RHEL 5.8. *spark-shell --master yarn --deploy-mode client --num-executors 15 --executor-cores 6 --executor-memory 12G * *import org.apache.spark.mllib.recommendation.ALS * *import org.apache.spark.mllib.recommendation.Rating * *val users_item_score_clean = sc.textFile(/tmp/spark_mllib_test).map(_.split(,)) * *val ratings = users_item_score_clean.map(x= Rating(x(0).toInt, x(1).toInt, x(2).toDouble)) * *val rank = 10 * *val numIterations = 20 * *val alpha = 1.0 * *val lambda = 0.01 * *val model = ALS.trainImplicit(ratings, rank, numIterations, lambda,alpha) * We are getting the following error (detailed error is attached in error.log): *-- org.jblas ERROR Couldn't load copied link file: java.lang.UnsatisfiedLinkError: * */u08/hadoop/yarn/nm/usercache/sharma.p/appcache/application_1425015707226_0128/* *container_e12_1425015707226_0128_01_10/tmp/jblas7605371780736016929libjblas_arch_flavor.so: libgfortran.so.3: * *cannot open shared object file: No such file or directory. * *On Linux 64bit, you need additional support libraries. * *You need to install libgfortran3. * *For example for debian or Ubuntu, type sudo apt-get install libgfortran3 * *For more information, see https://github.com/mikiobraun/jblas/wiki/Missing-Libraries https://github.com/mikiobraun/jblas/wiki/Missing-Libraries * *15/03/02 14:50:25 ERROR executor.Executor: Exception in task 22.0 in stage 6.0 (TID 374) * *java.lang.UnsatisfiedLinkError: org.jblas.NativeBlas.dposv(CII[DII[DII)I * *at org.jblas.NativeBlas.dposv(Native Method) * *at org.jblas.SimpleBlas.posv(SimpleBlas.java:369) * This exact code runs fine on another CDH 5.3 cluster which runs on RHEL 6.5. libgfortran.so.3 is not present on the problematic cluster. *[root@node04 ~]# find / -name libgfortran.so.3 2/dev/null * I am able to find *libgfortran.so.3 * on the cluster where the above code works: *[root@workingclusternode04 ~]# find / -name libgfortran.so.3 2/dev/null * */usr/lib64/libgfortran.so.3 * The following output shows that the fortran packages are installed on both the clusters: *On the cluster where this is not working * *[root@node04 ~]# yum list | grep -i fortran * *gcc-gfortran.x86_64 4.1.2-52.el5_8.1 installed * *libgfortran.i386 4.1.2-52.el5_8.1 installed * *libgfortran.x86_64 4.1.2-52.el5_8.1 installed * *On the cluster where the spark job is this working * *[root@** workingclusternode04** ~]# yum list | grep -i fortran * *Repository 'bda' is missing name in configuration, using id * *compat-libgfortran-41.x86_64 4.1.2-39.el6 @bda * *gcc-gfortran.x86_64 4.4.7-4.el6 @bda * *libgfortran.x86_64 4.4.7-4.el6 @bda * Has anybody run into this? Any pointers are much appreciated. Regards, Prashant -- org.jblas ERROR Couldn't load copied link file: java.lang.UnsatisfiedLinkError: /u08/hadoop/yarn/nm/usercache/sharma.p/appcache/application_1425015707226_0128/container_e12_1425015707226_0128_01_10/tmp/jblas7605371780736016929libjblas_arch_flavor.so: libgfortran.so.3: cannot open shared object file: No such file or directory. On Linux 64bit, you need additional support libraries. You need to install libgfortran3. For example for debian or Ubuntu, type sudo apt-get install libgfortran3 For more information, see https://github.com/mikiobraun/jblas/wiki/Missing-Libraries 15/03/02 14:50:25 ERROR executor.Executor: Exception in task 22.0 in stage 6.0 (TID 374) java.lang.UnsatisfiedLinkError: org.jblas.NativeBlas.dposv(CII[DII[DII)I at org.jblas.NativeBlas.dposv(Native Method) at org.jblas.SimpleBlas.posv(SimpleBlas.java:369) at org.jblas.Solve.solvePositive(Solve.java:68) at org.apache.spark.mllib.recommendation.ALS.solveLeastSquares(ALS.scala:607) at org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateBlock$2.apply(ALS.scala:593) at org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateBlock$2.apply(ALS.scala:581) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:156) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:156) at org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$updateBlock(ALS.scala:581) at org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:510) at
Re: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: pyspark on yarn
In Spark 1.2 you'll have to create a partitioned hive table https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-AddPartitions in order to read parquet data in this format. In Spark 1.3 the parquet data source will auto discover partitions when they are laid out in this format. Michael On Mon, Jan 5, 2015 at 1:01 PM, Sam Flint sam.fl...@magnetic.com wrote: Below is the code that I am running. I get an error for unresolved attributes. Can anyone point me in the right direction? Running from pyspark shell using yarn MASTER=yarn-client pyspark Error is below code: # Import SQLContext and data types from pyspark.sql import * # sc is an existing SparkContext. sqlContext = SQLContext(sc) # The result of loading a parquet file is also a SchemaRDD. # Try loading all data that you have parquetFile = sqlContext.parquetFile(/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.0.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.1.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.10.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.11.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.2.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.3.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.4.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.5.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.6.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.7.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.8.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.9.parq,/user/hive/warehouse/impala_new_4/key=20141001/f1448ca083a5e224-159572f61b50d7a3_854675293_data.0.parq,/user/hive/warehouse/impala_new_4/key=20141001/f1448ca083a5e224-159572f61b50d7a3_854675293_data.1.parq,/user/hive/warehouse/impala_new_4/key=20141001/f1448ca083a5e224-159572f61b50d7a3_854675293_data.2.parq,/user/hive/warehouse/impala_new_4/key=20141001/f1448ca083a5e224-159572f61b50d7a3_854675293_data.3.parq,/user/hive/warehouse/impala_new_4/key=20141001/f1448ca083a5e224-159572f61b50d7a3_854675293_data.4.parq) # Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerTempTable(parquetFileone) results = sqlContext.sql(SELECT * FROM parquetFileone where key=20141001 ) #print results for result in results.collect(): print result Traceback (most recent call last): File stdin, line 1, in module File /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/python/pyspark/sql.py, line 1615, in collect rows = RDD.collect(self) File /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/python/pyspark/rdd.py, line 678, in collect bytesInJava = self._jrdd.collect().iterator() File /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/python/pyspark/sql.py, line 1527, in _jrdd self._lazy_jrdd = self._jschema_rdd.javaToPython() File /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o29.javaToPython. : org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: *, tree: Project [*] Filter ('key = 20141001) Subquery parquetFileone ParquetRelation
Re: LBGFS optimizer performace
Is that error actually occurring in LBFGS? It looks like it might be happening before the data even gets to LBFGS. (Perhaps the outer join you're trying to do is making the dataset size explode a bit.) Are you able to call count() (or any RDD action) on the data before you pass it to LBFGS? On Tue, Mar 3, 2015 at 8:55 AM, Gustavo Enrique Salazar Torres gsala...@ime.usp.br wrote: Just did with the same error. I think the problem is the data.count() call in LBFGS because for huge datasets that's naive to do. I was thinking to write my version of LBFGS but instead of doing data.count() I will pass that parameter which I will calculate from a Spark SQL query. I will let you know. Thanks On Tue, Mar 3, 2015 at 3:25 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you try increasing your driver memory, reducing the executors and increasing the executor memory? Thanks Best Regards On Tue, Mar 3, 2015 at 10:09 AM, Gustavo Enrique Salazar Torres gsala...@ime.usp.br wrote: Hi there: I'm using LBFGS optimizer to train a logistic regression model. The code I implemented follows the pattern showed in https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but training data is obtained from a Spark SQL RDD. The problem I'm having is that LBFGS tries to count the elements in my RDD and that results in a OOM exception since my dataset is huge. I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on Hadoop YARN. My dataset is about 150 GB but I sample (I take only 1% of the data) it in order to scale logistic regression. The exception I'm getting is this: 15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in stage 2.0 (TID 7600, ip-10-155-20-71.ec2.internal): java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOfRange(Arrays.java:2694) at java.lang.String.init(String.java:203) at com.esotericsoftware.kryo.io.Input.readString(Input.java:448) at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157) at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.joins.HashOuterJoin.org $apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:179) at org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:199) at org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:196) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) I'm using this parameters at runtime: --num-executors 128 --executor-memory 1G --driver-memory 4G --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.storage.memoryFraction=0.2 I also persist my dataset using MEMORY_AND_DISK_SER but get
Re: LATERAL VIEW explode requests the full schema
I believe that this has been optimized https://github.com/apache/spark/commit/2a36292534a1e9f7a501e88f69bfc3a09fb62cb3 in Spark 1.3. On Tue, Mar 3, 2015 at 4:36 AM, matthes matthias.diekst...@web.de wrote: I use LATERAL VIEW explode(...) to read data from a parquet-file but the full schema is requeseted by parquet instead just the used columns. When I didn't use LATERAL VIEW the requested schema has just the two columns which I use. Is it correct or is there place for an optimization or do I understand there somthing wrong? Here are my examples: 1) hiveContext.sql(SELECT userid FROM pef WHERE observeddays==20140509) The requested schema is: optional group observedDays (LIST) { repeated int32 array; } required int64 userid; } This is what I expect although the result does not work, but that is not the problem here! 2) hiveContext.sql(SELECT userid FROM pef LATERAL VIEW explode(observeddays) od AS observed WHERE observed==20140509) the requested schema is: required int64 userid; optional int32 source; optional group observedDays (LIST) { repeated int32 array; } optional group placetobe (LIST) { repeated group bag { optional group array { optional binary palces (UTF8); optional group dates (LIST) { repeated int32 array; } } } } } Why does parquet request the full schema. I just use two fields of the table. Can somebody please explain me why this can happen. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/LATERAL-VIEW-explode-requests-the-full-schema-tp21893.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: throughput in the web console?
Sorry I made a mistake. Please ignore my question. On Tue, Mar 3, 2015 at 2:47 AM, Saiph Kappa saiph.ka...@gmail.com wrote: I performed repartitioning and everything went fine with respect to the number of CPU cores being used (and respective times). However, I noticed something very strange: inside a map operation I was doing a very simple calculation and always using the same dataset (small enough to be entirely processed in the same batch); then I iterated the RDDs and calculated the mean, foreachRDD(rdd = println(MEAN: + rdd.mean())). I noticed that for different numbers of partitions (for instance, 4 and 8), the result of the mean is different. Why does this happen? On Thu, Feb 26, 2015 at 7:03 PM, Tathagata Das t...@databricks.com wrote: If you have one receiver, and you are doing only map-like operaitons then the process will primarily happen on one machine. To use all the machines, either receiver in parallel with multiple receivers, or spread out the computation by explicitly repartitioning the received streams (DStream.repartition) with sufficient partitions to load balance across more machines. TD On Thu, Feb 26, 2015 at 9:52 AM, Saiph Kappa saiph.ka...@gmail.com wrote: One more question: while processing the exact same batch I noticed that giving more CPUs to the worker does not decrease the duration of the batch. I tried this with 4 and 8 CPUs. Though, I noticed that giving only 1 CPU the duration increased, but apart from that the values were pretty similar, whether I was using 4 or 6 or 8 CPUs. On Thu, Feb 26, 2015 at 5:35 PM, Saiph Kappa saiph.ka...@gmail.com wrote: By setting spark.eventLog.enabled to true it is possible to see the application UI after the application has finished its execution, however the Streaming tab is no longer visible. For measuring the duration of batches in the code I am doing something like this: «wordCharValues.foreachRDD(rdd = { val startTick = System.currentTimeMillis() val result = rdd.take(1) val timeDiff = System.currentTimeMillis() - startTick» But my quesiton is: is it possible to see the rate/throughput (records/sec) when I have a stream to process log files that appear in a folder? On Thu, Feb 26, 2015 at 1:36 AM, Tathagata Das t...@databricks.com wrote: Yes. # tuples processed in a batch = sum of all the tuples received by all the receivers. In screen shot, there was a batch with 69.9K records, and there was a batch which took 1 s 473 ms. These two batches can be the same, can be different batches. TD On Wed, Feb 25, 2015 at 10:11 AM, Josh J joshjd...@gmail.com wrote: If I'm using the kafka receiver, can I assume the number of records processed in the batch is the sum of the number of records processed by the kafka receiver? So in the screen shot attached the max rate of tuples processed in a batch is 42.7K + 27.2K = 69.9K tuples processed in a batch with a max processing time of 1 second 473 ms? On Wed, Feb 25, 2015 at 8:48 AM, Akhil Das ak...@sigmoidanalytics.com wrote: By throughput you mean Number of events processed etc? [image: Inline image 1] Streaming tab already have these statistics. Thanks Best Regards On Wed, Feb 25, 2015 at 9:59 PM, Josh J joshjd...@gmail.com wrote: On Wed, Feb 25, 2015 at 7:54 AM, Akhil Das ak...@sigmoidanalytics.com wrote: For SparkStreaming applications, there is already a tab called Streaming which displays the basic statistics. Would I just need to extend this tab to add the throughput? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Resource manager UI for Spark applications
Ted, If the application is running then the logs are not available. Plus what i want to view is the details about the running app as in spark UI. Do I have to open some ports or do some other setting changes? On Tue, Mar 3, 2015 at 10:08 AM, Ted Yu yuzhih...@gmail.com wrote: bq. changing the address with internal to the external one , but still does not work. Not sure what happened. For the time being, you can use yarn command line to pull container log (put in your appId and container Id): yarn logs -applicationId application_1386639398517_0007 -containerId container_1386639398517_0007_01_19 Cheers On Tue, Mar 3, 2015 at 9:50 AM, roni roni.epi...@gmail.com wrote: Hi Ted, I used s3://support.elasticmapreduce/spark/install-spark to install spark on my EMR cluster. It is 1.2.0. When I click on the link for history or logs it takes me to http://ip-172-31-43-116.us-west-2.compute.internal:9035/node/containerlogs/container_1424105590052_0070_01_01/hadoop and I get - The server at *ip-172-31-43-116.us-west-2.compute.internal* can't be found, because the DNS lookup failed. DNS is the network service that translates a website's name to its Internet address. This error is most often caused by having no connection to the Internet or a misconfigured network. It can also be caused by an unresponsive DNS server or a firewall preventing Google Chrome from accessing the network. I tried changing the address with internal to the external one , but still does not work. Thanks _roni On Tue, Mar 3, 2015 at 9:05 AM, Ted Yu yuzhih...@gmail.com wrote: bq. spark UI does not work for Yarn-cluster. Can you be a bit more specific on the error(s) you saw ? What Spark release are you using ? Cheers On Tue, Mar 3, 2015 at 8:53 AM, Rohini joshi roni.epi...@gmail.com wrote: Sorry , for half email - here it is again in full Hi , I have 2 questions - 1. I was trying to use Resource Manager UI for my SPARK application using yarn cluster mode as I observed that spark UI does not work for Yarn-cluster. IS that correct or am I missing some setup? 2. when I click on Application Monitoring or history , i get re-directed to some linked with internal Ip address. Even if I replace that address with the public IP , it still does not work. What kind of setup changes are needed for that? Thanks -roni On Tue, Mar 3, 2015 at 8:45 AM, Rohini joshi roni.epi...@gmail.com wrote: Hi , I have 2 questions - 1. I was trying to use Resource Manager UI for my SPARK application using yarn cluster mode as I observed that spark UI does not work for Yarn-cluster. IS that correct or am I missing some setup?
Re: Solve least square problem of the form min norm(A x - b)^2^ + lambda * n * norm(x)^2 ?
The minimization problem you're describing in the email title also looks like it could be solved using the RidgeRegression solver in MLlib, once you transform your DistributedMatrix into an RDD[LabeledPoint]. On Tue, Mar 3, 2015 at 11:02 AM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: There are couple of solvers that I've written that is part of the AMPLab ml-matrix repo [1,2]. These aren't part of MLLib yet though and if you are interested in porting them I'd be happy to review it Thanks Shivaram [1] https://github.com/amplab/ml-matrix/blob/master/src/main/scala/edu/berkeley/cs/amplab/mlmatrix/TSQR.scala [2] https://github.com/amplab/ml-matrix/blob/master/src/main/scala/edu/berkeley/cs/amplab/mlmatrix/NormalEquations.scala On Tue, Mar 3, 2015 at 9:01 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, Is there a least square solver based on DistributedMatrix that we can use out of the box in the current (or the master) version of spark ? It seems that the only least square solver available in spark is private to recommender package. Cheers, Jao
Re: Why different numbers of partitions give different results for the same computation on the same dataset?
You can use DStream.transform() to do any arbitrary RDD transformations on the RDDs generated by a DStream. val coalescedDStream = myDStream.transform { _.coalesce(...) } On Tue, Mar 3, 2015 at 1:47 PM, Saiph Kappa saiph.ka...@gmail.com wrote: Sorry I made a mistake in my code. Please ignore my question number 2. Different numbers of partitions give *the same* results! On Tue, Mar 3, 2015 at 7:32 PM, Saiph Kappa saiph.ka...@gmail.com wrote: Hi, I have a spark streaming application, running on a single node, consisting mainly of map operations. I perform repartitioning to control the number of CPU cores that I want to use. The code goes like this: val ssc = new StreamingContext(sparkConf, Seconds(5)) val distFile = ssc.textFileStream(/home/myuser/spark-example/dump) val words = distFile.repartition(cores.toInt).flatMap(_.split( )) .filter(_.length 3) val wordCharValues = words.map(word = { var sum = 0 word.toCharArray.foreach(c = {sum += c.toInt}) sum.toDouble / word.length.toDouble }).foreachRDD(rdd = { println(MEAN: + rdd.mean()) }) I have 2 questions: 1) How can I use coalesce in this code instead of repartition? 2) Why, using the same dataset (which is a small file processed within a single batch), the result that I obtain for the mean varies with the number of partitions? If I don't call the repartition method, the result is always the same for every execution, as it should be. But repartitioning for instance in 2 partitions gives a different mean value than using 8 partitions. I really don't understand why given that my code is deterministic. Can someone enlighten me on this? Thanks.
Re: insert Hive table with RDD
Will this recognize the hive partitions as well. Example insert into specific partition of hive ? On Tue, Mar 3, 2015 at 11:42 PM, Cheng, Hao hao.ch...@intel.com wrote: Using the SchemaRDD / DataFrame API via HiveContext Assume you're using the latest code, something probably like: val hc = new HiveContext(sc) import hc.implicits._ existedRdd.toDF().insertInto(hivetable) or existedRdd.toDF().registerTempTable(mydata) hc.sql(insert into hivetable as select xxx from mydata) -Original Message- From: patcharee [mailto:patcharee.thong...@uni.no] Sent: Tuesday, March 3, 2015 7:09 PM To: user@spark.apache.org Subject: insert Hive table with RDD Hi, How can I insert an existing hive table with an RDD containing my data? Any examples? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can not query TempTable registered by SQL Context using HiveContext
As it says in the API docs https://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.sql.SchemaRDD, tables created with registerTempTable are local to the context that creates them: ... The lifetime of this temporary table is tied to the SQLContext https://spark.apache.org/docs/1.2.0/api/scala/org/apache/spark/sql/SQLContext.html that was used to create this SchemaRDD. On Tue, Mar 3, 2015 at 5:52 AM, shahab shahab.mok...@gmail.com wrote: Hi, I did an experiment with Hive and SQL context , I queried Cassandra using CassandraAwareSQLContext (a custom SQL context from Calliope) , then I registered the rdd as a temp table , next I tried to query it using HiveContext, but it seems that hive context can not see the registered table suing SQL context. Is this a normal case? Stack trace: ERROR hive.ql.metadata.Hive - NoSuchObjectException(message:default.MyTableName table not found) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1373) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:103) best, /Shahab
dynamically change receiver for a spark stream
Hi all, i have been trying to setup a stream using a custom receiver that would pick up data from twitter using follow function to listen just to some users . I'd like to keep that stream context running and dynamically change the custom receiver by adding ids of users that I'd listen to . String[] filterm = {}; long[] filteru = {}; TwitterReceiver receiver = new TwitterReceiver( twitter.getAuthorization(), filterm, filteru, StorageLevel .MEMORY_AND_DISK_2()); JavaDStreamStatus tweets = ssc.receiverStream(receiver); its working but i need to upgrade filteru by adding id of other users without stopping stream . thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/dynamically-change-receiver-for-a-spark-stream-tp21899.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Some questions after playing a little with the new ml.Pipeline.
I see. I think your best bet is to create the cnnModel on the master and then serialize it to send to the workers. If it's big (1M or so), then you can broadcast it and use the broadcast variable in the UDF. There is not a great way to do something equivalent to mapPartitions with UDFs right now. On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Here is my current implementation with current master version of spark *class DeepCNNFeature extends Transformer with HasInputCol with HasOutputCol ... { override def transformSchema(...) { ... }* *override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = {* * transformSchema(dataSet.schema, paramMap, logging = true)* * val map = this.paramMap ++ paramMap val deepCNNFeature = udf((v: Vector)= {* * val cnnModel = new CaffeModel * * cnnModel.transform(v)* * } : Vector ) dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol* * }* *}* where CaffeModel is a java api to Caffe C++ model. The problem here is that for every row it will create a new instance of CaffeModel which is inefficient since creating a new model means loading a large model file. And it will transform only a single row at a time. Or a Caffe network can process a batch of rows efficiently. In other words, is it possible to create an UDF that can operatats on a partition in order to minimize the creation of a CaffeModel and to take advantage of the Caffe network batch processing ? On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley jos...@databricks.com wrote: I see, thanks for clarifying! I'd recommend following existing implementations in spark.ml transformers. You'll need to define a UDF which operates on a single Row to compute the value for the new column. You can then use the DataFrame DSL to create the new column; the DSL provides a nice syntax for what would otherwise be a SQL statement like select ... from I'm recommending looking at the existing implementation (rather than stating it here) because it changes between Spark 1.2 and 1.3. In 1.3, the DSL is much improved and makes it easier to create a new column. Joseph On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: class DeepCNNFeature extends Transformer ... { override def transform(data: DataFrame, paramMap: ParamMap): DataFrame = { // How can I do a map partition on the underlying RDD and then add the column ? } } On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi Joseph, Thank your for the tips. I understand what should I do when my data are represented as a RDD. The thing that I can't figure out is how to do the same thing when the data is view as a DataFrame and I need to add the result of my pretrained model as a new column in the DataFrame. Preciselly, I want to implement the following transformer : class DeepCNNFeature extends Transformer ... { } On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley jos...@databricks.com wrote: Hi Jao, You can use external tools and libraries if they can be called from your Spark program or script (with appropriate conversion of data types, etc.). The best way to apply a pre-trained model to a dataset would be to call the model from within a closure, e.g.: myRDD.map { myDatum = preTrainedModel.predict(myDatum) } If your data is distributed in an RDD (myRDD), then the above call will distribute the computation of prediction using the pre-trained model. It will require that all of your Spark workers be able to run the preTrainedModel; that may mean installing Caffe and dependencies on all nodes in the compute cluster. For the second question, I would modify the above call as follows: myRDD.mapPartitions { myDataOnPartition = val myModel = // instantiate neural network on this partition myDataOnPartition.map { myDatum = myModel.predict(myDatum) } } I hope this helps! Joseph On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, We mainly do large scale computer vision task (image classification, retrieval, ...). The pipeline is really great stuff for that. We're trying to reproduce the tutorial given on that topic during the latest spark summit ( http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html ) using the master version of spark pipeline and dataframe. The tutorial shows different examples of feature extraction stages before running machine learning algorithms. Even the tutorial is straightforward to reproduce with this new API, we still have some questions : - Can one use external tools (e.g via pipe) as a pipeline stage ? An example of use case is to extract feature learned with convolutional
Re: Issue with yarn cluster - hangs in accepted state.
Do you have enough resource in your cluster? You can check your resource manager to see the usage. Thanks. Zhan Zhang On Mar 3, 2015, at 8:51 AM, abhi abhishek...@gmail.commailto:abhishek...@gmail.com wrote: I am trying to run below java class with yarn cluster, but it hangs in accepted state . i don't see any error . Below is the class and command . Any help is appreciated . Thanks, Abhi bin/spark-submit --class com.mycompany.app.SimpleApp --master yarn-cluster /home/hduser/my-app-1.0.jar {code} public class SimpleApp { public static void main(String[] args) { String logFile = /home/hduser/testspark.txt; // Should be some file on your system SparkConf conf = new SparkConf().setAppName(Simple Application); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDString logData = sc.textFile(logFile).cache(); long numAs = logData.filter(new FunctionString, Boolean() { public Boolean call(String s) { return s.contains(a); } }).count(); long numBs = logData.filter(new FunctionString, Boolean() { public Boolean call(String s) { return s.contains(b); } }).count(); System.out.println(Lines with a: + numAs + , lines with b: + numBs); } } {code} 15/03/03 11:47:40 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:41 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:42 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:43 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:44 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:45 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:46 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:47 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:48 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:49 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:50 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:51 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:52 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:53 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:54 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:55 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:56 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:57 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:58 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:47:59 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:48:00 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:48:01 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:48:02 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:48:03 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED) 15/03/03 11:48:04 INFO yarn.Client: Application report for application_1425398386987_0002 (state: ACCEPTED
Does sc.newAPIHadoopFile support multiple directories (or nested directories)?
I did some experiments and it seems not. But I like to get confirmation (or perhaps I missed something). If it does support, could u let me know how to specify multiple folders? Thanks. Senqiang
Re: gc time too long when using mllib als
Also try 1.3.0-RC1 or the current master. ALS should performance much better in 1.3. -Xiangrui On Tue, Mar 3, 2015 at 1:00 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You need to increase the parallelism/repartition the data to a higher number to get ride of those. Thanks Best Regards On Tue, Mar 3, 2015 at 2:26 PM, lisendong lisend...@163.com wrote: why does the gc time so long? i 'm using als in mllib, while the garbage collection time is too long (about 1/3 of total time) I have tried some measures in the tunning spark guide, and try to set the new generation memory, but it still does not work... Tasks Task Index Task ID Status Locality Level ExecutorLaunch Time DurationGC TimeResult Ser Time Shuffle ReadWrite Time Shuffle Write Errors 1 2801SUCCESS PROCESS_LOCAL h1.zw 2015/03/03 16:35:15 8.6 min 3.3 min 1238.3 MB 57 ms 69.2 MB 0 2800SUCCESS PROCESS_LOCAL h11.zw 2015/03/03 16:35:15 6.0 min 1.1 min 1261.0 MB 55 ms 68.6 MB 2 2802SUCCESS PROCESS_LOCAL h9.zw 2015/03/03 16:35:15 5.0 min 1.5 min 834.4 MB60 ms 69.6 MB 4 2804SUCCESS PROCESS_LOCAL h4.zw 2015/03/03 16:35:15 4.4 min 59 s689.8 MB 62 ms 71.4 MB 3 2803SUCCESS PROCESS_LOCAL h8.zw 2015/03/03 16:35:15 4.2 min 1.6 min 803.6 MB66 ms 71.5 MB 7 2807SUCCESS PROCESS_LOCAL h6.zw 2015/03/03 16:35:15 4.3 min 1.4 min 733.1 MB9 s 66.5 MB 6 2806SUCCESS PROCESS_LOCAL h10.zw 2015/03/03 16:35:15 6.4 min 3.1 min 950.5 MB68 ms 69.3 MB 5 2805SUCCESS PROCESS_LOCAL h3.zw 2015/03/03 16:35:15 8.0 min 2.7 min 1132.0 MB 64 ms 70.3 MB 8 2808SUCCESS PROCESS_LOCAL h12.zw 2015/03/03 16:35:15 4.5 min 2.2 min 1304.2 MB 60 ms 69.4 MB -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/gc-time-too-long-when-using-mllib-als-tp21891.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Resource manager UI for Spark applications
In Yarn (Cluster or client), you can access the spark ui when the app is running. After app is done, you can still access it, but need some extra setup for history server. Thanks. Zhan Zhang On Mar 3, 2015, at 10:08 AM, Ted Yu yuzhih...@gmail.commailto:yuzhih...@gmail.com wrote: bq. changing the address with internal to the external one , but still does not work. Not sure what happened. For the time being, you can use yarn command line to pull container log (put in your appId and container Id): yarn logs -applicationId application_1386639398517_0007 -containerId container_1386639398517_0007_01_19 Cheers On Tue, Mar 3, 2015 at 9:50 AM, roni roni.epi...@gmail.commailto:roni.epi...@gmail.com wrote: Hi Ted, I used s3://support.elasticmapreduce/spark/install-spark to install spark on my EMR cluster. It is 1.2.0. When I click on the link for history or logs it takes me to http://ip-172-31-43-116.us-west-2.compute.internal:9035/node/containerlogs/container_1424105590052_0070_01_01/hadoop and I get - The server at ip-172-31-43-116.ushttp://ip-172-31-43-116.us-west-2.compute.internal can't be found, because the DNS lookup failed. DNS is the network service that translates a website's name to its Internet address. This error is most often caused by having no connection to the Internet or a misconfigured network. It can also be caused by an unresponsive DNS server or a firewall preventing Google Chrome from accessing the network. I tried changing the address with internal to the external one , but still does not work. Thanks _roni On Tue, Mar 3, 2015 at 9:05 AM, Ted Yu yuzhih...@gmail.commailto:yuzhih...@gmail.com wrote: bq. spark UI does not work for Yarn-cluster. Can you be a bit more specific on the error(s) you saw ? What Spark release are you using ? Cheers On Tue, Mar 3, 2015 at 8:53 AM, Rohini joshi roni.epi...@gmail.commailto:roni.epi...@gmail.com wrote: Sorry , for half email - here it is again in full Hi , I have 2 questions - 1. I was trying to use Resource Manager UI for my SPARK application using yarn cluster mode as I observed that spark UI does not work for Yarn-cluster. IS that correct or am I missing some setup? 2. when I click on Application Monitoring or history , i get re-directed to some linked with internal Ip address. Even if I replace that address with the public IP , it still does not work. What kind of setup changes are needed for that? Thanks -roni On Tue, Mar 3, 2015 at 8:45 AM, Rohini joshi roni.epi...@gmail.commailto:roni.epi...@gmail.com wrote: Hi , I have 2 questions - 1. I was trying to use Resource Manager UI for my SPARK application using yarn cluster mode as I observed that spark UI does not work for Yarn-cluster. IS that correct or am I missing some setup?
Re: Why different numbers of partitions give different results for the same computation on the same dataset?
Sorry I made a mistake in my code. Please ignore my question number 2. Different numbers of partitions give *the same* results! On Tue, Mar 3, 2015 at 7:32 PM, Saiph Kappa saiph.ka...@gmail.com wrote: Hi, I have a spark streaming application, running on a single node, consisting mainly of map operations. I perform repartitioning to control the number of CPU cores that I want to use. The code goes like this: val ssc = new StreamingContext(sparkConf, Seconds(5)) val distFile = ssc.textFileStream(/home/myuser/spark-example/dump) val words = distFile.repartition(cores.toInt).flatMap(_.split( )) .filter(_.length 3) val wordCharValues = words.map(word = { var sum = 0 word.toCharArray.foreach(c = {sum += c.toInt}) sum.toDouble / word.length.toDouble }).foreachRDD(rdd = { println(MEAN: + rdd.mean()) }) I have 2 questions: 1) How can I use coalesce in this code instead of repartition? 2) Why, using the same dataset (which is a small file processed within a single batch), the result that I obtain for the mean varies with the number of partitions? If I don't call the repartition method, the result is always the same for every execution, as it should be. But repartitioning for instance in 2 partitions gives a different mean value than using 8 partitions. I really don't understand why given that my code is deterministic. Can someone enlighten me on this? Thanks.
Re: Resource manager UI for Spark applications
ah!! I think I know what you mean. My job was just in accepted stage for a long time as it was running a huge file. But now that it is in running stage , I can see it . I can see it at post 9046 though instead of 4040 . But I can see it. Thanks -roni On Tue, Mar 3, 2015 at 1:19 PM, Zhan Zhang zzh...@hortonworks.com wrote: In Yarn (Cluster or client), you can access the spark ui when the app is running. After app is done, you can still access it, but need some extra setup for history server. Thanks. Zhan Zhang On Mar 3, 2015, at 10:08 AM, Ted Yu yuzhih...@gmail.com wrote: bq. changing the address with internal to the external one , but still does not work. Not sure what happened. For the time being, you can use yarn command line to pull container log (put in your appId and container Id): yarn logs -applicationId application_1386639398517_0007 -containerId container_1386639398517_0007_01_19 Cheers On Tue, Mar 3, 2015 at 9:50 AM, roni roni.epi...@gmail.com wrote: Hi Ted, I used s3://support.elasticmapreduce/spark/install-spark to install spark on my EMR cluster. It is 1.2.0. When I click on the link for history or logs it takes me to http://ip-172-31-43-116.us-west-2.compute.internal:9035/node/containerlogs/container_1424105590052_0070_01_01/hadoop and I get - The server at *ip-172-31-43-116.us http://ip-172-31-43-116.us-west-2.compute.internal* can't be found, because the DNS lookup failed. DNS is the network service that translates a website's name to its Internet address. This error is most often caused by having no connection to the Internet or a misconfigured network. It can also be caused by an unresponsive DNS server or a firewall preventing Google Chrome from accessing the network. I tried changing the address with internal to the external one , but still does not work. Thanks _roni On Tue, Mar 3, 2015 at 9:05 AM, Ted Yu yuzhih...@gmail.com wrote: bq. spark UI does not work for Yarn-cluster. Can you be a bit more specific on the error(s) you saw ? What Spark release are you using ? Cheers On Tue, Mar 3, 2015 at 8:53 AM, Rohini joshi roni.epi...@gmail.com wrote: Sorry , for half email - here it is again in full Hi , I have 2 questions - 1. I was trying to use Resource Manager UI for my SPARK application using yarn cluster mode as I observed that spark UI does not work for Yarn-cluster. IS that correct or am I missing some setup? 2. when I click on Application Monitoring or history , i get re-directed to some linked with internal Ip address. Even if I replace that address with the public IP , it still does not work. What kind of setup changes are needed for that? Thanks -roni On Tue, Mar 3, 2015 at 8:45 AM, Rohini joshi roni.epi...@gmail.com wrote: Hi , I have 2 questions - 1. I was trying to use Resource Manager UI for my SPARK application using yarn cluster mode as I observed that spark UI does not work for Yarn-cluster. IS that correct or am I missing some setup?
how to save Word2VecModel
Hello I started using spark. I am working with Word2VecModel. However I am not able to save the trained model. Here is what I am doing: inp = sc.textFile(/Users/mediratta/code/word2vec/trunk-d/sub-5).map(lambda row: row.split( )) word2vec = Word2Vec() model = word2vec.fit(inp) out = open('abc.bin', 'wb') pickle.dump(model, out, pickle.HIGHEST_PROTOCOL) But I get error: It appears that you are attempting to reference SparkContext from a broadcast Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063. However when I run pickle.dump of argument of type list instead of Word2VecModel, then pickle.dumps works fine. So seems the error is coming because of the type of the first argument (Word2VecModel in this case). However the error message seems misleading. Any clue what I am doing wrong? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-save-Word2VecModel-tp21900.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Switchover Time
Can you elaborate on what is this switchover time? TD On Tue, Mar 3, 2015 at 9:57 PM, Nastooh Avessta (navesta) nave...@cisco.com wrote: Hi On a standalone, Spark 1.0.0, with 1 master and 2 workers, operating in client mode, running a udp streaming application, I am noting around 2 second elapse time on switchover, upon shutting down the streaming worker, where streaming window length is 1 sec. I am wondering what parameters are available to the developer to shorten this switchover time. Cheers, [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg] *Nastooh Avessta* ENGINEER.SOFTWARE ENGINEERING nave...@cisco.com Phone: *+1 604 647 1527 %2B1%20604%20647%201527* *Cisco Systems Limited* 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121 VANCOUVER BRITISH COLUMBIA V7X 1J1 CA Cisco.com http://www.cisco.com/ [image: Think before you print.]Think before you print. This email may contain confidential and privileged material for the sole use of the intended recipient. Any review, use, distribution or disclosure by others is strictly prohibited. If you are not the intended recipient (or authorized to receive for the recipient), please contact the sender by reply email and delete all copies of this message. For corporate legal information go to: http://www.cisco.com/web/about/doing_business/legal/cri/index.html Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. Phone: 416-306-7000; Fax: 416-306-7099. *Preferences http://www.cisco.com/offer/subscribe/?sid=000478326 - Unsubscribe http://www.cisco.com/offer/unsubscribe/?sid=000478327 – Privacy http://www.cisco.com/web/siteassets/legal/privacy.html*
Connecting a PHP/Java applications to Spark SQL Thrift Server
We have installed hadoop cluster with hive and spark and the spark sql thrift server is up and running without any problem. Now we have set of applications need to use spark sql thrift server to query some data. Some of these applications are java applications and the others are PHP applications. As I am an old fashioned java developer, I used to connect java applications to BD servers like Mysql using a JDBC driver. Is there a corresponding driver for connecting with Spark Sql Thrift server ? Or what is the library I need to use to connect to it? For PHP, what are the ways we can use to connect PHP applications to Spark Sql Thrift Server? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-a-PHP-Java-applications-to-Spark-SQL-Thrift-Server-tp21902.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: UnsatisfiedLinkError related to libgfortran when running MLLIB code on RHEL 5.8
libgfortran.x86_64 4.1.2-52.el5_8.1 comes with libgfortran.so.1 but not libgfortran.so.3. JBLAS requires the latter. If you have root access, you can try to install a newer version of libgfortran. Otherwise, maybe you can try Spark 1.3, which doesn't use JBLAS in ALS. -Xiangrui On Tue, Mar 3, 2015 at 11:21 AM, Prashant Sharma sharma.prashant.1...@gmail.com wrote: Hi Folks, We are trying to run the following code from the spark shell in a CDH 5.3 cluster running on RHEL 5.8. spark-shell --master yarn --deploy-mode client --num-executors 15 --executor-cores 6 --executor-memory 12G import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.Rating val users_item_score_clean = sc.textFile(/tmp/spark_mllib_test).map(_.split(,)) val ratings = users_item_score_clean.map(x= Rating(x(0).toInt, x(1).toInt, x(2).toDouble)) val rank = 10 val numIterations = 20 val alpha = 1.0 val lambda = 0.01 val model = ALS.trainImplicit(ratings, rank, numIterations, lambda,alpha) We are getting the following error (detailed error is attached in error.log): -- org.jblas ERROR Couldn't load copied link file: java.lang.UnsatisfiedLinkError: /u08/hadoop/yarn/nm/usercache/sharma.p/appcache/application_1425015707226_0128/ container_e12_1425015707226_0128_01_10/tmp/jblas7605371780736016929libjblas_arch_flavor.so: libgfortran.so.3: cannot open shared object file: No such file or directory. On Linux 64bit, you need additional support libraries. You need to install libgfortran3. For example for debian or Ubuntu, type sudo apt-get install libgfortran3 For more information, see https://github.com/mikiobraun/jblas/wiki/Missing-Libraries 15/03/02 14:50:25 ERROR executor.Executor: Exception in task 22.0 in stage 6.0 (TID 374) java.lang.UnsatisfiedLinkError: org.jblas.NativeBlas.dposv(CII[DII[DII)I at org.jblas.NativeBlas.dposv(Native Method) at org.jblas.SimpleBlas.posv(SimpleBlas.java:369) This exact code runs fine on another CDH 5.3 cluster which runs on RHEL 6.5. libgfortran.so.3 is not present on the problematic cluster. [root@node04 ~]# find / -name libgfortran.so.3 2/dev/null I am able to find libgfortran.so.3 on the cluster where the above code works: [root@workingclusternode04 ~]# find / -name libgfortran.so.3 2/dev/null /usr/lib64/libgfortran.so.3 The following output shows that the fortran packages are installed on both the clusters: On the cluster where this is not working [root@node04 ~]# yum list | grep -i fortran gcc-gfortran.x86_64 4.1.2-52.el5_8.1 installed libgfortran.i386 4.1.2-52.el5_8.1 installed libgfortran.x86_64 4.1.2-52.el5_8.1 installed On the cluster where the spark job is this working [root@ workingclusternode04 ~]# yum list | grep -i fortran Repository 'bda' is missing name in configuration, using id compat-libgfortran-41.x86_64 4.1.2-39.el6 @bda gcc-gfortran.x86_64 4.4.7-4.el6 @bda libgfortran.x86_64 4.4.7-4.el6 @bda Has anybody run into this? Any pointers are much appreciated. Regards, Prashant - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can not query TempTable registered by SQL Context using HiveContext
Thanks Michael. I understand now. best, /Shahab On Tue, Mar 3, 2015 at 9:38 PM, Michael Armbrust mich...@databricks.com wrote: As it says in the API docs https://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.sql.SchemaRDD, tables created with registerTempTable are local to the context that creates them: ... The lifetime of this temporary table is tied to the SQLContext https://spark.apache.org/docs/1.2.0/api/scala/org/apache/spark/sql/SQLContext.html that was used to create this SchemaRDD. On Tue, Mar 3, 2015 at 5:52 AM, shahab shahab.mok...@gmail.com wrote: Hi, I did an experiment with Hive and SQL context , I queried Cassandra using CassandraAwareSQLContext (a custom SQL context from Calliope) , then I registered the rdd as a temp table , next I tried to query it using HiveContext, but it seems that hive context can not see the registered table suing SQL context. Is this a normal case? Stack trace: ERROR hive.ql.metadata.Hive - NoSuchObjectException(message:default.MyTableName table not found) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1373) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:103) best, /Shahab
Re: Having lots of FetchFailedException in join
Hmm... ok, previous errors are still block fetch errors. 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning fetch of 11 outstanding blocks java.io.IOException: Failed to connect to host-/:55597 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87) at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:289) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) Caused by: java.net.ConnectException: Connection refused: lvshdc5dn0518.lvs.paypal.com/10.196.244.48:55597 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) And I checked executor on container host-, everything is good. Jianshi On Wed, Mar 4, 2015 at 12:28 PM, Aaron Davidson ilike...@gmail.com wrote: Drat! That doesn't help. Could you scan from the top to see if there were any fatal errors preceding these? Sometimes a OOM will cause this type of issue further down. On Tue, Mar 3, 2015 at 8:16 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: The failed executor has the following error messages. Any hints? 15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 5711039715419258699
java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils
Hi, When I submit my spark job, I see the following runtime exception in the log, Exception in thread Thread-1 java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils at SparkHdfs.run(SparkHdfs.java:56) Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358)I am including the assembly and streaming jars during submit, ./spark-submit --jars/Users/hadoop/kafka-0.8.1.1-src/core/build/libs/kafka_2.8.0-0.8.1.1.jar,/Users/hadoop/spark-1.2.1/external/kafka/target/spark-streaming-kafka_2.10-1.2.1.jar--class KafkaMain --master spark://krishs-mbp:7077 --deploy-mode cluster--num-executors 100 --driver-memory 1g --executor-memory 1g --supervisefile:///Users/hadoop/dev/kafkahbase/kafka/src/sparkhbase.jar
Spark sql results can't be printed out to system console from spark streaming application
Dear all, I found the below sample code can be printed out only in spark shell, but when I moved them into my spark streaming application, nothing can be printed out into system console. Can you explain why it happened? anything related to new spark context? Thanks a lot! val anotherPeopleRDD = sc_context.parallelize( {name:Yin,address:{city:Columbus,state:Ohio}} :: Nil) anotherPeopleRDD.toArray().foreach(line = System.out.println(line)) val jsonMessage = sqlContext.jsonRDD(anotherPeopleRDD) jsonMessage.toArray().foreach(line = System.out.println(line)) jsonMessage.registerTempTable(people) val test: SchemaRDD = sqlContext.sql(select count(*) from people) test.toArray().foreach(line = System.out.println(line)) Best regards, Cui Lin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark sql results can't be printed out to system console from spark streaming application
Hi, can you explain how you copied that into your *streaming* application? Like, how do you issue the SQL, what data do you operate on, how do you view the logs etc.? Tobias On Wed, Mar 4, 2015 at 8:55 AM, Cui Lin cui@hds.com wrote: Dear all, I found the below sample code can be printed out only in spark shell, but when I moved them into my spark streaming application, nothing can be printed out into system console. Can you explain why it happened? anything related to new spark context? Thanks a lot! val anotherPeopleRDD = sc_context.parallelize( {name:Yin,address:{city:Columbus,state:Ohio}} :: Nil) anotherPeopleRDD.toArray().foreach(line = System.out.println(line)) val jsonMessage = sqlContext.jsonRDD(anotherPeopleRDD) jsonMessage.toArray().foreach(line = System.out.println(line)) jsonMessage.registerTempTable(people) val test: SchemaRDD = sqlContext.sql(select count(*) from people) test.toArray().foreach(line = System.out.println(line)) Best regards, Cui Lin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Switchover Time
I am confused. Are you killing the 1st worker node to see whether the system restarts the receiver on the second worker? TD On Tue, Mar 3, 2015 at 10:49 PM, Nastooh Avessta (navesta) nave...@cisco.com wrote: This is the time that it takes for the driver to start receiving data once again, from the 2nd worker, when the 1st worker, where streaming thread was initially running, is shutdown. Cheers, [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg] *Nastooh Avessta* ENGINEER.SOFTWARE ENGINEERING nave...@cisco.com Phone: *+1 604 647 1527 %2B1%20604%20647%201527* *Cisco Systems Limited* 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121 VANCOUVER BRITISH COLUMBIA V7X 1J1 CA Cisco.com http://www.cisco.com/ [image: Think before you print.]Think before you print. This email may contain confidential and privileged material for the sole use of the intended recipient. Any review, use, distribution or disclosure by others is strictly prohibited. If you are not the intended recipient (or authorized to receive for the recipient), please contact the sender by reply email and delete all copies of this message. For corporate legal information go to: http://www.cisco.com/web/about/doing_business/legal/cri/index.html Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. Phone: 416-306-7000; Fax: 416-306-7099. *Preferences http://www.cisco.com/offer/subscribe/?sid=000478326 - Unsubscribe http://www.cisco.com/offer/unsubscribe/?sid=000478327 – Privacy http://www.cisco.com/web/siteassets/legal/privacy.html* *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* Tuesday, March 03, 2015 10:24 PM *To:* Nastooh Avessta (navesta) *Cc:* user@spark.apache.org *Subject:* Re: Spark Streaming Switchover Time Can you elaborate on what is this switchover time? TD On Tue, Mar 3, 2015 at 9:57 PM, Nastooh Avessta (navesta) nave...@cisco.com wrote: Hi On a standalone, Spark 1.0.0, with 1 master and 2 workers, operating in client mode, running a udp streaming application, I am noting around 2 second elapse time on switchover, upon shutting down the streaming worker, where streaming window length is 1 sec. I am wondering what parameters are available to the developer to shorten this switchover time. Cheers, [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg] *Nastooh Avessta* ENGINEER.SOFTWARE ENGINEERING nave...@cisco.com Phone: *+1 604 647 1527 %2B1%20604%20647%201527* *Cisco Systems Limited* 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121 VANCOUVER BRITISH COLUMBIA V7X 1J1 CA Cisco.com http://www.cisco.com/ [image: Think before you print.]Think before you print. This email may contain confidential and privileged material for the sole use of the intended recipient. Any review, use, distribution or disclosure by others is strictly prohibited. If you are not the intended recipient (or authorized to receive for the recipient), please contact the sender by reply email and delete all copies of this message. For corporate legal information go to: http://www.cisco.com/web/about/doing_business/legal/cri/index.html Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. Phone: 416-306-7000; Fax: 416-306-7099. *Preferences http://www.cisco.com/offer/subscribe/?sid=000478326 - Unsubscribe http://www.cisco.com/offer/unsubscribe/?sid=000478327 – Privacy http://www.cisco.com/web/siteassets/legal/privacy.html*
Re: Some questions after playing a little with the new ml.Pipeline.
If think it will be interesting to have the equivalents of mappartitions with dataframe. There are many use cases where data are processed in batch. Another example is a simple linear classifier Ax=b where A is the matrix of feature vectors, x the model and b the output. Here again the product Ax can be done efficiently for a batch of data. I will test for the broadcast hack. But I'm wondering whether it is possible to append or zip a RDD as a new column of a Dataframe. The idea is to do mappartitions on the the RDD of the input column and then and the result as output column ? Jao Le 3 mars 2015 à 22:04, Joseph Bradley jos...@databricks.com a écrit : I see. I think your best bet is to create the cnnModel on the master and then serialize it to send to the workers. If it's big (1M or so), then you can broadcast it and use the broadcast variable in the UDF. There is not a great way to do something equivalent to mapPartitions with UDFs right now. On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Here is my current implementation with current master version of spark class DeepCNNFeature extends Transformer with HasInputCol with HasOutputCol ... { override def transformSchema(...) { ... } override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = { transformSchema(dataSet.schema, paramMap, logging = true) val map = this.paramMap ++ paramMap val deepCNNFeature = udf((v: Vector)= { val cnnModel = new CaffeModel cnnModel.transform(v) } : Vector ) dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol } } where CaffeModel is a java api to Caffe C++ model. The problem here is that for every row it will create a new instance of CaffeModel which is inefficient since creating a new model means loading a large model file. And it will transform only a single row at a time. Or a Caffe network can process a batch of rows efficiently. In other words, is it possible to create an UDF that can operatats on a partition in order to minimize the creation of a CaffeModel and to take advantage of the Caffe network batch processing ? On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley jos...@databricks.com wrote: I see, thanks for clarifying! I'd recommend following existing implementations in spark.ml transformers. You'll need to define a UDF which operates on a single Row to compute the value for the new column. You can then use the DataFrame DSL to create the new column; the DSL provides a nice syntax for what would otherwise be a SQL statement like select ... from I'm recommending looking at the existing implementation (rather than stating it here) because it changes between Spark 1.2 and 1.3. In 1.3, the DSL is much improved and makes it easier to create a new column. Joseph On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: class DeepCNNFeature extends Transformer ... { override def transform(data: DataFrame, paramMap: ParamMap): DataFrame = { // How can I do a map partition on the underlying RDD and then add the column ? } } On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi Joseph, Thank your for the tips. I understand what should I do when my data are represented as a RDD. The thing that I can't figure out is how to do the same thing when the data is view as a DataFrame and I need to add the result of my pretrained model as a new column in the DataFrame. Preciselly, I want to implement the following transformer : class DeepCNNFeature extends Transformer ... { } On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley jos...@databricks.com wrote: Hi Jao, You can use external tools and libraries if they can be called from your Spark program or script (with appropriate conversion of data types, etc.). The best way to apply a pre-trained model to a dataset would be to call the model from within a closure, e.g.: myRDD.map { myDatum = preTrainedModel.predict(myDatum) } If your data is distributed in an RDD (myRDD), then the above call will distribute the computation of prediction using the pre-trained model. It will require that all of your Spark workers be able to run the preTrainedModel; that may mean installing Caffe and dependencies on all nodes in the compute cluster. For the second question, I would modify the above call as follows: myRDD.mapPartitions { myDataOnPartition = val myModel = // instantiate neural network on this partition myDataOnPartition.map { myDatum = myModel.predict(myDatum) } } I hope this helps! Joseph On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, We
RE: Connecting a PHP/Java applications to Spark SQL Thrift Server
SparkSQL supports JDBC/ODBC connectivity, so if that's the route you needed/wanted to connect through you could do so via java/php apps. Havent used either so cant speak to the developer experience, assume its pretty good as would be preferred method for lots of third party enterprise apps/tooling If you prefer using the thrift server/interface, if they don't exist already in open source land you can use thrift definitions to generate client libs in any supported thrift language and use that for connectivity. Seems one issue with thrift-server is when running in cluster mode. Seems like it still exists but UX of error has been cleaned up in 1.3: https://issues.apache.org/jira/browse/SPARK-5176 -Original Message- From: fanooos [mailto:dev.fano...@gmail.com] Sent: Tuesday, March 3, 2015 11:15 PM To: user@spark.apache.org Subject: Connecting a PHP/Java applications to Spark SQL Thrift Server We have installed hadoop cluster with hive and spark and the spark sql thrift server is up and running without any problem. Now we have set of applications need to use spark sql thrift server to query some data. Some of these applications are java applications and the others are PHP applications. As I am an old fashioned java developer, I used to connect java applications to BD servers like Mysql using a JDBC driver. Is there a corresponding driver for connecting with Spark Sql Thrift server ? Or what is the library I need to use to connect to it? For PHP, what are the ways we can use to connect PHP applications to Spark Sql Thrift Server? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-a-PHP-Java-ap plications-to-Spark-SQL-Thrift-Server-tp21902.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: One of the executor not getting StopExecutor message
Not quite sure, but you can try increasing the spark.akka.threads, most likely it can be a yarn related issue. Thanks Best Regards On Tue, Mar 3, 2015 at 3:38 PM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi, Operations are not very extensive, as this scenario is not always reproducible. One of the executor start behaving in this manner. For this particular application, we are using 8 cores in one executors, and practically, 4 executors are launched on one machine. This machine has good config with respect to number of cores. Somehow, to me it seems to be some akka communication issue. If i try to take thread dump of the executor, once it appears to be in trouble, then time out happens. Can it be something related to* spark.akka.threads?* On Fri, Feb 27, 2015 at 3:55 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Mostly, that particular executor is stuck on GC Pause, what operation are you performing? You can try increasing the parallelism if you see only 1 executor is doing the task. Thanks Best Regards On Fri, Feb 27, 2015 at 11:39 AM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi, I am running a spark application on Yarn in cluster mode. One of my executor appears to be in hang state, for a long time, and gets finally killed by the driver. As compared to other executors, It have not received StopExecutor message from the driver. Here are the logs at the end of this container (C_1): 15/02/26 18:17:07 DEBUG storage.BlockManagerSlaveActor: Done removing broadcast 36, response is 2 15/02/26 18:17:07 DEBUG storage.BlockManagerSlaveActor: Sent response: 2 to Actor[akka.tcp://sparkDriver@TMO-DN73:37906/temp/$aB] 15/02/26 18:17:09 DEBUG ipc.Client: IPC Client (1206963429) connection to TMO-GCR70/192.168.162.70:9000 from admin: closed 15/02/26 18:17:09 DEBUG ipc.Client: IPC Client (1206963429) connection to TMO-GCR70/192.168.162.70:9000 from admin: stopped, remaining connections 0 15/02/26 18:17:32 DEBUG hdfs.LeaseRenewer: Lease renewer daemon for [] with renew id 1 executed 15/02/26 18:18:00 DEBUG hdfs.LeaseRenewer: Lease renewer daemon for [] with renew id 1 expired 15/02/26 18:18:00 DEBUG hdfs.LeaseRenewer: Lease renewer daemon for [] with renew id 1 exited 15/02/26 20:33:13 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM NOTE that it has no logs for more than 2hrs. Here are the logs at the end of normal container ( C_2): 15/02/26 20:33:09 DEBUG storage.BlockManagerSlaveActor: Sent response: 2 to Actor[akka.tcp://sparkDriver@TMO-DN73:37906/temp/$D+b] 15/02/26 20:33:10 DEBUG executor.CoarseGrainedExecutorBackend: [actor] received message StopExecutor from Actor[akka.tcp://sparkDriver@TMO-DN73 :37906/user/CoarseGrainedScheduler#160899257] 15/02/26 20:33:10 INFO executor.CoarseGrainedExecutorBackend: Driver commanded a shutdown 15/02/26 20:33:10 INFO storage.MemoryStore: MemoryStore cleared 15/02/26 20:33:10 INFO storage.BlockManager: BlockManager stopped 15/02/26 20:33:10 DEBUG executor.CoarseGrainedExecutorBackend: [actor] *handled message (181.499835 ms) StopExecutor* from Actor[akka.tcp://sparkDriver@TMO-DN73 :37906/user/CoarseGrainedScheduler#160899257] 15/02/26 20:33:10 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/02/26 20:33:10 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/02/26 20:33:10 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 15/02/26 20:33:10 DEBUG ipc.Client: stopping client from cache: org.apache.hadoop.ipc.Client@76a68bd4 15/02/26 20:33:10 DEBUG ipc.Client: stopping client from cache: org.apache.hadoop.ipc.Client@76a68bd4 15/02/26 20:33:10 DEBUG ipc.Client: removing client from cache: org.apache.hadoop.ipc.Client@76a68bd4 15/02/26 20:33:10 DEBUG ipc.Client: stopping actual client because no more references remain: org.apache.hadoop.ipc.Client@76a68bd4 15/02/26 20:33:10 DEBUG ipc.Client: Stopping client 15/02/26 20:33:10 DEBUG storage.DiskBlockManager: Shutdown hook called 15/02/26 20:33:10 DEBUG util.Utils: Shutdown hook called At the driver side, i can see the logs related to heartbeat messages from C_1 till 20:05:00 -- 15/02/26 20:05:00 DEBUG spark.HeartbeatReceiver: [actor] received message Heartbeat(7,[Lscala.Tuple2;@151e5ce6,BlockManagerId(7, TMO-DN73, 34106)) from Actor[akka.tcp://sparkExecutor@TMO-DN73 :43671/temp/$fn] After this, it continues to receive the heartbeat from other executors except this one, and here follows the message responsible for it's SIGTERM:
RE: Spark Streaming Switchover Time
This is the time that it takes for the driver to start receiving data once again, from the 2nd worker, when the 1st worker, where streaming thread was initially running, is shutdown. Cheers, [http://www.cisco.com/web/europe/images/email/signature/logo05.jpg] Nastooh Avessta ENGINEER.SOFTWARE ENGINEERING nave...@cisco.com Phone: +1 604 647 1527 Cisco Systems Limited 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121 VANCOUVER BRITISH COLUMBIA V7X 1J1 CA Cisco.comhttp://www.cisco.com/ [Think before you print.]Think before you print. This email may contain confidential and privileged material for the sole use of the intended recipient. Any review, use, distribution or disclosure by others is strictly prohibited. If you are not the intended recipient (or authorized to receive for the recipient), please contact the sender by reply email and delete all copies of this message. For corporate legal information go to: http://www.cisco.com/web/about/doing_business/legal/cri/index.html Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. Phone: 416-306-7000; Fax: 416-306-7099. Preferenceshttp://www.cisco.com/offer/subscribe/?sid=000478326 - Unsubscribehttp://www.cisco.com/offer/unsubscribe/?sid=000478327 – Privacyhttp://www.cisco.com/web/siteassets/legal/privacy.html From: Tathagata Das [mailto:t...@databricks.com] Sent: Tuesday, March 03, 2015 10:24 PM To: Nastooh Avessta (navesta) Cc: user@spark.apache.org Subject: Re: Spark Streaming Switchover Time Can you elaborate on what is this switchover time? TD On Tue, Mar 3, 2015 at 9:57 PM, Nastooh Avessta (navesta) nave...@cisco.commailto:nave...@cisco.com wrote: Hi On a standalone, Spark 1.0.0, with 1 master and 2 workers, operating in client mode, running a udp streaming application, I am noting around 2 second elapse time on switchover, upon shutting down the streaming worker, where streaming window length is 1 sec. I am wondering what parameters are available to the developer to shorten this switchover time. Cheers, [http://www.cisco.com/web/europe/images/email/signature/logo05.jpg] Nastooh Avessta ENGINEER.SOFTWARE ENGINEERING nave...@cisco.commailto:nave...@cisco.com Phone: +1 604 647 1527tel:%2B1%20604%20647%201527 Cisco Systems Limited 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121 VANCOUVER BRITISH COLUMBIA V7X 1J1 CA Cisco.comhttp://www.cisco.com/ [Think before you print.]Think before you print. This email may contain confidential and privileged material for the sole use of the intended recipient. Any review, use, distribution or disclosure by others is strictly prohibited. If you are not the intended recipient (or authorized to receive for the recipient), please contact the sender by reply email and delete all copies of this message. For corporate legal information go to: http://www.cisco.com/web/about/doing_business/legal/cri/index.html Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. Phone: 416-306-7000tel:416-306-7000; Fax: 416-306-7099tel:416-306-7099. Preferenceshttp://www.cisco.com/offer/subscribe/?sid=000478326 - Unsubscribehttp://www.cisco.com/offer/unsubscribe/?sid=000478327 – Privacyhttp://www.cisco.com/web/siteassets/legal/privacy.html
Re: Does sc.newAPIHadoopFile support multiple directories (or nested directories)?
Thanks Ted. Actually a follow up question. I need to read multiple HDFS files into RDD. What I am doing now is: for each file I read them into a RDD. Then later on I union all these RDDs into one RDD. I am not sure if it is the best way to do it. ThanksSenqiang On Tuesday, March 3, 2015 2:40 PM, Ted Yu yuzhih...@gmail.com wrote: Looking at scaladoc: /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */ def newAPIHadoopFile[K, V, F : NewInputFormat[K, V]] Your conclusion is confirmed. On Tue, Mar 3, 2015 at 1:59 PM, S. Zhou myx...@yahoo.com.invalid wrote: I did some experiments and it seems not. But I like to get confirmation (or perhaps I missed something). If it does support, could u let me know how to specify multiple folders? Thanks. Senqiang
Re: Does sc.newAPIHadoopFile support multiple directories (or nested directories)?
Looking at FileInputFormat#listStatus(): // Whether we need to recursive look into the directory structure boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false); where: public static final String INPUT_DIR_RECURSIVE = mapreduce.input.fileinputformat.input.dir.recursive; FYI On Tue, Mar 3, 2015 at 3:14 PM, Stephen Boesch java...@gmail.com wrote: The sc.textFile() invokes the Hadoop FileInputFormat via the (subclass) TextInputFormat. Inside the logic does exist to do the recursive directory reading - i.e. first detecting if an entry were a directory and if so then descending: for (FileStatus http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/fs/FileStatus.java#FileStatus globStat: matches) { 218 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#218 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java# * if (globStat.isDir http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/fs/FileStatus.java#FileStatus.isDir%28%29()) {* *219 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#219* http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java# for(FileStatus http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/fs/FileStatus.java#FileStatus stat: f*s**.listStatus http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/fs/FileSystem.java#FileSystem.listStatus%28org.apache.hadoop.fs.Path%2Corg.apache.hadoop.fs.PathFilter%29*(globStat.getPath http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/fs/FileStatus.java#FileStatus.getPath%28%29(), 220 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#220 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java# inputFilter)) { 221 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#221 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java# result.add http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b27/java/util/List.java#List.add%28org.apache.hadoop.fs.FileStatus%29(stat); 222 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#222 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java# } 223 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#223 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java# } else { 224 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#224 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java# result.add http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b27/java/util/List.java#List.add%28org.apache.hadoop.fs.FileStatus%29(globStat); 225
Re: Does sc.newAPIHadoopFile support multiple directories (or nested directories)?
Thanks for the confirmation, Stephen. On Tue, Mar 3, 2015 at 3:53 PM, Stephen Boesch java...@gmail.com wrote: Thanks, I was looking at an old version of FileInputFormat.. BEFORE setting the recursive config ( mapreduce.input.fileinputformat.input.dir.recursive) scala sc.textFile(dev/*).count java.io.IOException: *Not a file*: file:/shared/sparkup/dev/audit-release/blank_maven_build The default is null/not set which is evaluated as false: scala sc.hadoopConfiguration.get(mapreduce.input.fileinputformat.input.dir.recursive) res1: String = null AFTER: Now set the value : sc.hadoopConfiguration.set(mapreduce.input.fileinputformat.input.dir.recursive,true) scala sc.hadoopConfiguration.get(mapreduce.input.fileinputformat.input.dir.recursive) res4: String = true scalasc.textFile(dev/*).count .. res5: Long = 3481 So it works. 2015-03-03 15:26 GMT-08:00 Ted Yu yuzhih...@gmail.com: Looking at FileInputFormat#listStatus(): // Whether we need to recursive look into the directory structure boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false); where: public static final String INPUT_DIR_RECURSIVE = mapreduce.input.fileinputformat.input.dir.recursive; FYI On Tue, Mar 3, 2015 at 3:14 PM, Stephen Boesch java...@gmail.com wrote: The sc.textFile() invokes the Hadoop FileInputFormat via the (subclass) TextInputFormat. Inside the logic does exist to do the recursive directory reading - i.e. first detecting if an entry were a directory and if so then descending: for (FileStatus http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/fs/FileStatus.java#FileStatus globStat: matches) { 218 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#218 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java# * if (globStat.isDir http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/fs/FileStatus.java#FileStatus.isDir%28%29()) {* *219 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#219* http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java# for(FileStatus http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/fs/FileStatus.java#FileStatus stat: f*s**.listStatus http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/fs/FileSystem.java#FileSystem.listStatus%28org.apache.hadoop.fs.Path%2Corg.apache.hadoop.fs.PathFilter%29*(globStat.getPath http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/fs/FileStatus.java#FileStatus.getPath%28%29(), 220 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#220 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java# inputFilter)) { 221 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#221 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java# result.add http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b27/java/util/List.java#List.add%28org.apache.hadoop.fs.FileStatus%29(stat); 222 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#222 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java# } 223 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#223
Re: ImportError: No module named iter ... (on CDH5 v1.2.0+cdh5.3.2+369-1.cdh5.3.2.p0.17.el6.noarch) ...
Weird python errors like this generally mean you have different versions of python in the nodes of your cluster. Can you check that? On Tue, Mar 3, 2015 at 4:21 PM, subscripti...@prismalytics.io subscripti...@prismalytics.io wrote: Hi Friends: We noticed the following in 'pyspark' happens when running in distributed Standalone Mode (MASTER=spark://vps00:7077), but not in Local Mode (MASTER=local[n]). See the following, particularly what is highlighted in Red (again the problem only happens in Standalone Mode). Any ideas? Thank you in advance! =:) rdd = sc.textFile('file:///etc/hosts') rdd.first() Traceback (most recent call last): File input, line 1, in module File /usr/lib/spark/python/pyspark/rdd.py, line 1129, in first rs = self.take(1) File /usr/lib/spark/python/pyspark/rdd.py, line , in take res = self.context.runJob(self, takeUpToNumLeft, p, True) File /usr/lib/spark/python/pyspark/context.py, line 818, in runJob it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) File /usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ self.target_id, self.name) File /usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value format(target_id, '.', name), value) Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 7, vps03): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /usr/lib/spark/python/pyspark/worker.py, line 107, in main process() File /usr/lib/spark/python/pyspark/worker.py, line 98, in process serializer.dump_stream(func(split_index, iterator), outfile) File /usr/lib/spark/python/pyspark/serializers.py, line 227, in dump_stream vs = list(itertools.islice(iterator, batch)) File /usr/lib/spark/python/pyspark/rdd.py, line 1106, in takeUpToNumLeft --- See around line 1106 of this file in the CDH5 Spark Distribution. while taken left: ImportError: No module named iter # But iter() exists as a built-in (not as a module) ... iter(range(10)) listiterator object at 0x423ff10 cluster$ rpm -qa | grep -i spark [ ... ] spark-python-1.2.0+cdh5.3.2+369-1.cdh5.3.2.p0.17.el6.noarch spark-core-1.2.0+cdh5.3.2+369-1.cdh5.3.2.p0.17.el6.noarch spark-worker-1.2.0+cdh5.3.2+369-1.cdh5.3.2.p0.17.el6.noarch spark-master-1.2.0+cdh5.3.2+369-1.cdh5.3.2.p0.17.el6.noarch Thank you! Team Prismalytics -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issue with yarn cluster - hangs in accepted state.
Hi, On Wed, Mar 4, 2015 at 6:20 AM, Zhan Zhang zzh...@hortonworks.com wrote: Do you have enough resource in your cluster? You can check your resource manager to see the usage. Yep, I can confirm that this is a very annoying issue. If there is not enough memory or VCPUs available, your app will just stay in ACCEPTED state until resources are available. You can have a look at https://github.com/jubatus/jubaql-docker/blob/master/hadoop/yarn-site.xml#L35 to see some settings that might help. Tobias
Re: Does sc.newAPIHadoopFile support multiple directories (or nested directories)?
Thanks, I was looking at an old version of FileInputFormat.. BEFORE setting the recursive config ( mapreduce.input.fileinputformat.input.dir.recursive) scala sc.textFile(dev/*).count java.io.IOException: *Not a file*: file:/shared/sparkup/dev/audit-release/blank_maven_build The default is null/not set which is evaluated as false: scala sc.hadoopConfiguration.get(mapreduce.input.fileinputformat.input.dir.recursive) res1: String = null AFTER: Now set the value : sc.hadoopConfiguration.set(mapreduce.input.fileinputformat.input.dir.recursive,true) scala sc.hadoopConfiguration.get(mapreduce.input.fileinputformat.input.dir.recursive) res4: String = true scalasc.textFile(dev/*).count .. res5: Long = 3481 So it works. 2015-03-03 15:26 GMT-08:00 Ted Yu yuzhih...@gmail.com: Looking at FileInputFormat#listStatus(): // Whether we need to recursive look into the directory structure boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false); where: public static final String INPUT_DIR_RECURSIVE = mapreduce.input.fileinputformat.input.dir.recursive; FYI On Tue, Mar 3, 2015 at 3:14 PM, Stephen Boesch java...@gmail.com wrote: The sc.textFile() invokes the Hadoop FileInputFormat via the (subclass) TextInputFormat. Inside the logic does exist to do the recursive directory reading - i.e. first detecting if an entry were a directory and if so then descending: for (FileStatus http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/fs/FileStatus.java#FileStatus globStat: matches) { 218 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#218 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java# * if (globStat.isDir http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/fs/FileStatus.java#FileStatus.isDir%28%29()) {* *219 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#219* http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java# for(FileStatus http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/fs/FileStatus.java#FileStatus stat: f*s**.listStatus http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/fs/FileSystem.java#FileSystem.listStatus%28org.apache.hadoop.fs.Path%2Corg.apache.hadoop.fs.PathFilter%29*(globStat.getPath http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/fs/FileStatus.java#FileStatus.getPath%28%29(), 220 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#220 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java# inputFilter)) { 221 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#221 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java# result.add http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b27/java/util/List.java#List.add%28org.apache.hadoop.fs.FileStatus%29(stat); 222 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#222 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java# } 223 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#223
TreeNodeException: Unresolved attributes
Hi, I am trying to run a simple select query on a table. val restaurants=hiveCtx.hql(select * from TableName where column like '%SomeString%' ) This gives an error as below: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: *, tree: How do I solve this? -- Regards, Anusha
Re: Does sc.newAPIHadoopFile support multiple directories (or nested directories)?
The sc.textFile() invokes the Hadoop FileInputFormat via the (subclass) TextInputFormat. Inside the logic does exist to do the recursive directory reading - i.e. first detecting if an entry were a directory and if so then descending: for (FileStatus http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/fs/FileStatus.java#FileStatus globStat: matches) { 218 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#218 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java# * if (globStat.isDir http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/fs/FileStatus.java#FileStatus.isDir%28%29()) {* *219 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#219* http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java# for(FileStatus http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/fs/FileStatus.java#FileStatus stat: f*s**.listStatus http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/fs/FileSystem.java#FileSystem.listStatus%28org.apache.hadoop.fs.Path%2Corg.apache.hadoop.fs.PathFilter%29*(globStat.getPath http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/fs/FileStatus.java#FileStatus.getPath%28%29(), 220 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#220 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java# inputFilter)) { 221 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#221 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java# result.add http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b27/java/util/List.java#List.add%28org.apache.hadoop.fs.FileStatus%29(stat); 222 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#222 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java# } 223 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#223 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java# } else { 224 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#224 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java# result.add http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b27/java/util/List.java#List.add%28org.apache.hadoop.fs.FileStatus%29(globStat); 225 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#225 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java# } 226 http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#226
Re: LBGFS optimizer performace
I would recommend caching; if you can't persist, iterative algorithms will not work well. I don't think calling count on the dataset is problematic; every iteration in LBFGS iterates over the whole dataset and does a lot more computation than count(). It would be helpful to see some error occurring within LBFGS. With the given stack trace, I'm not sure what part of LBFGS it's happening in. On Tue, Mar 3, 2015 at 2:27 PM, Gustavo Enrique Salazar Torres gsala...@ime.usp.br wrote: Yeah, I can call count before that and it works. Also I was over caching tables but I removed those. Now there is no caching but it gets really slow since it calculates my table RDD many times. Also hacked the LBFGS code to pass the number of examples which I calculated outside in a Spark SQL query but just moved the location of the problem. The query I'm running looks like this: sSELECT $mappedFields, tableB.id as b_id FROM tableA LEFT JOIN tableB ON tableA.id=tableB.table_a_id WHERE tableA.status='' OR tableB.status='' mappedFields contains a list of fields which I'm interested in. The result of that query goes through (including sampling) some transformations before being input to LBFGS. My dataset has 180GB just for feature selection, I'm planning to use 450GB to train the final model and I'm using 16 c3.2xlarge EC2 instances, that means I have 240GB of RAM available. Any suggestion? I'm starting to check the algorithm because I don't understand why it needs to count the dataset. Thanks Gustavo On Tue, Mar 3, 2015 at 6:08 PM, Joseph Bradley jos...@databricks.com wrote: Is that error actually occurring in LBFGS? It looks like it might be happening before the data even gets to LBFGS. (Perhaps the outer join you're trying to do is making the dataset size explode a bit.) Are you able to call count() (or any RDD action) on the data before you pass it to LBFGS? On Tue, Mar 3, 2015 at 8:55 AM, Gustavo Enrique Salazar Torres gsala...@ime.usp.br wrote: Just did with the same error. I think the problem is the data.count() call in LBFGS because for huge datasets that's naive to do. I was thinking to write my version of LBFGS but instead of doing data.count() I will pass that parameter which I will calculate from a Spark SQL query. I will let you know. Thanks On Tue, Mar 3, 2015 at 3:25 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you try increasing your driver memory, reducing the executors and increasing the executor memory? Thanks Best Regards On Tue, Mar 3, 2015 at 10:09 AM, Gustavo Enrique Salazar Torres gsala...@ime.usp.br wrote: Hi there: I'm using LBFGS optimizer to train a logistic regression model. The code I implemented follows the pattern showed in https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but training data is obtained from a Spark SQL RDD. The problem I'm having is that LBFGS tries to count the elements in my RDD and that results in a OOM exception since my dataset is huge. I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on Hadoop YARN. My dataset is about 150 GB but I sample (I take only 1% of the data) it in order to scale logistic regression. The exception I'm getting is this: 15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in stage 2.0 (TID 7600, ip-10-155-20-71.ec2.internal): java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOfRange(Arrays.java:2694) at java.lang.String.init(String.java:203) at com.esotericsoftware.kryo.io.Input.readString(Input.java:448) at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157) at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
ImportError: No module named iter ... (on CDH5 v1.2.0+cdh5.3.2+369-1.cdh5.3.2.p0.17.el6.noarch) ...
Hi Friends: We noticed the following in 'pyspark' happens when running in distributed Standalone Mode (MASTER=spark://vps00:7077), but not in Local Mode (MASTER=local[n]). See the following, particularly what is highlighted in *Red* (again the problem only happens in Standalone Mode). Any ideas? Thank you in advance! =:) rdd = sc.textFile('file:///etc/hosts') rdd.first() Traceback (most recent call last): File input, line 1, in module File /usr/lib/spark/python/pyspark/rdd.py, line 1129, in first rs = self.take(1) File /usr/lib/spark/python/pyspark/rdd.py, line , in take res = self.context.runJob(self, takeUpToNumLeft, p, True) File /usr/lib/spark/python/pyspark/context.py, line 818, in runJob it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) File /usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ self.target_id, self.name) File /usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value format(target_id, '.', name), value) Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 7, vps03): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /usr/lib/spark/python/pyspark/worker.py, line 107, in main process() File /usr/lib/spark/python/pyspark/worker.py, line 98, in process serializer.dump_stream(func(split_index, iterator), outfile) File /usr/lib/spark/python/pyspark/serializers.py, line 227, in dump_stream vs = list(itertools.islice(iterator, batch)) File */usr/lib/spark/python/pyspark/rdd.py, line 1106*, in takeUpToNumLeft --- *See around line _1106_ of this file in the CDH5 Spark Distribution*. while taken left: *ImportError: No module named iter* # But *iter()* exists as a built-in (not as a module) ... iter(range(10)) listiterator object at 0x423ff10 cluster$ rpm -qa | grep -i spark [ ... ] spark-python-1.2.0+cdh5.3.2+369-1.cdh5.3.2.p0.17.el6.noarch spark-core-1.2.0+cdh5.3.2+369-1.cdh5.3.2.p0.17.el6.noarch spark-worker-1.2.0+cdh5.3.2+369-1.cdh5.3.2.p0.17.el6.noarch spark-master-1.2.0+cdh5.3.2+369-1.cdh5.3.2.p0.17.el6.noarch Thank you! Team Prismalytics
RE: Spark SQL Thrift Server start exception : java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory
Which version / distribution are you using? Please references this blog that Felix C posted if you’re running on CDH. http://eradiating.wordpress.com/2015/02/22/getting-hivecontext-to-work-in-cdh/ Or you may also need to download the datanucleus*.jar files try to add the option of “--jars” while starting the spark shell. From: Anusha Shamanur [mailto:anushas...@gmail.com] Sent: Wednesday, March 4, 2015 5:07 AM To: Cheng, Hao Subject: Re: Spark SQL Thrift Server start exception : java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory Hi, I am getting the same error. There is no lib folder in my $SPARK_HOME. But I included these jars while calling spark-shell. Now, I get this: Caused by: org.datanucleus.exceptions.ClassNotResolvedException: Class org.datanucleus.store.rdbms.RDBMSStoreManager was not found in the CLASSPATH. Please check your specification and your CLASSPATH. at org.datanucleus.ClassLoaderResolverImpl.classForName(ClassLoaderResolverImpl.java:218) How do I solve this? On Mon, Mar 2, 2015 at 11:04 PM, Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com wrote: Copy those jars into the $SPARK_HOME/lib/ datanucleus-api-jdo-3.2.6.jar datanucleus-core-3.2.10.jar datanucleus-rdbms-3.2.9.jar see https://github.com/apache/spark/blob/master/bin/compute-classpath.sh#L120 -Original Message- From: fanooos [mailto:dev.fano...@gmail.commailto:dev.fano...@gmail.com] Sent: Tuesday, March 3, 2015 2:50 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Spark SQL Thrift Server start exception : java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory I have installed a hadoop cluster (version : 2.6.0), apache spark (version : 1.2.1 preBuilt for hadoop 2.4 and later), and hive (version 1.0.0). When I try to start the spark sql thrift server I am getting the following exception. Exception in thread main java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346) at org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:235) at org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:231) at scala.Option.orElse(Option.scala:257) at org.apache.spark.sql.hive.HiveContext.x$3$lzycompute(HiveContext.scala:231) at org.apache.spark.sql.hive.HiveContext.x$3(HiveContext.scala:229) at org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:229) at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:229) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:292) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276) at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:248) at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:91) at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:90) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.sql.SQLContext.init(SQLContext.scala:90) at org.apache.spark.sql.hive.HiveContext.init(HiveContext.scala:72) at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala:51) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveThriftServer2.scala:56) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThriftServer2.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1412) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(RetryingMetaStoreClient.java:62) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340) ... 26 more
Re: LBGFS optimizer performace
Yeah, I can call count before that and it works. Also I was over caching tables but I removed those. Now there is no caching but it gets really slow since it calculates my table RDD many times. Also hacked the LBFGS code to pass the number of examples which I calculated outside in a Spark SQL query but just moved the location of the problem. The query I'm running looks like this: sSELECT $mappedFields, tableB.id as b_id FROM tableA LEFT JOIN tableB ON tableA.id=tableB.table_a_id WHERE tableA.status='' OR tableB.status='' mappedFields contains a list of fields which I'm interested in. The result of that query goes through (including sampling) some transformations before being input to LBFGS. My dataset has 180GB just for feature selection, I'm planning to use 450GB to train the final model and I'm using 16 c3.2xlarge EC2 instances, that means I have 240GB of RAM available. Any suggestion? I'm starting to check the algorithm because I don't understand why it needs to count the dataset. Thanks Gustavo On Tue, Mar 3, 2015 at 6:08 PM, Joseph Bradley jos...@databricks.com wrote: Is that error actually occurring in LBFGS? It looks like it might be happening before the data even gets to LBFGS. (Perhaps the outer join you're trying to do is making the dataset size explode a bit.) Are you able to call count() (or any RDD action) on the data before you pass it to LBFGS? On Tue, Mar 3, 2015 at 8:55 AM, Gustavo Enrique Salazar Torres gsala...@ime.usp.br wrote: Just did with the same error. I think the problem is the data.count() call in LBFGS because for huge datasets that's naive to do. I was thinking to write my version of LBFGS but instead of doing data.count() I will pass that parameter which I will calculate from a Spark SQL query. I will let you know. Thanks On Tue, Mar 3, 2015 at 3:25 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you try increasing your driver memory, reducing the executors and increasing the executor memory? Thanks Best Regards On Tue, Mar 3, 2015 at 10:09 AM, Gustavo Enrique Salazar Torres gsala...@ime.usp.br wrote: Hi there: I'm using LBFGS optimizer to train a logistic regression model. The code I implemented follows the pattern showed in https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but training data is obtained from a Spark SQL RDD. The problem I'm having is that LBFGS tries to count the elements in my RDD and that results in a OOM exception since my dataset is huge. I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on Hadoop YARN. My dataset is about 150 GB but I sample (I take only 1% of the data) it in order to scale logistic regression. The exception I'm getting is this: 15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in stage 2.0 (TID 7600, ip-10-155-20-71.ec2.internal): java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOfRange(Arrays.java:2694) at java.lang.String.init(String.java:203) at com.esotericsoftware.kryo.io.Input.readString(Input.java:448) at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157) at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at
Re: Does sc.newAPIHadoopFile support multiple directories (or nested directories)?
Looking at scaladoc: /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */ def newAPIHadoopFile[K, V, F : NewInputFormat[K, V]] Your conclusion is confirmed. On Tue, Mar 3, 2015 at 1:59 PM, S. Zhou myx...@yahoo.com.invalid wrote: I did some experiments and it seems not. But I like to get confirmation (or perhaps I missed something). If it does support, could u let me know how to specify multiple folders? Thanks. Senqiang