Re: Spark Streaming Application Got killed after 2 hours
Hi Saj, What is the size of the input data that you are putting on the stream ? Have you tried running the same application with different set of data ? Its weird that exactly after 2 hours the streaming stops. Try running the same application with different data of different size to look if it has something to do with memory issue. Can you also provide a detailed error log . Thanks. On Sun, Nov 16, 2014 at 11:49 AM, SAJ [via Apache Spark User List] ml-node+s1001560n19021...@n3.nabble.com wrote: Hi All, I am trying to run spark streaming application to run for 24/7 but exactly after 2 hours it got killed.I have again tried but again it got killed in 2 hours.Following are the error log in worker 14/11/15 13:53:24 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(ip-x,38863) 14/11/15 13:53:24 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(ip-x,38863) 14/11/15 13:53:24 ERROR network.SendingConnection: Exception while reading SendingConnection to ConnectionManagerI Does anybody faced these same issue. Thanks Regards, SAJ -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Application-Got-killed-after-2-hours-tp19021.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Application-Got-killed-after-2-hours-tp19021p19023.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
How to kill/upgrade/restart driver launched in Spark standalone cluster+supervised mode?
Hello, I have a Spark Standalone cluster running in HA mode. I launched a application using spark-submit with cluster and supervised mode enabled and it launched sucessfully on one of the worker nodes. How can I stop/restart/kill or otherwise manage such task running in a standalone cluster? Seems there is no options in the web interface. I wonder how I can upgrade my driver in the future. Also, does supervised mode work across worker nodes? IE will it relaunch on another node if the current one dies or does it only handle restart on same node after driver crash? I would love to hear others experience with this :) Thanks! (PS i am launching a Spark Streaming application) // Jesper Lundgren
Re: SparkSQL exception on cached parquet table
(Forgot to cc user mail list) On 11/16/14 4:59 PM, Cheng Lian wrote: Hey Sadhan, Thanks for the additional information, this is helpful. Seems that some Parquet internal contract was broken, but I'm not sure whether it's caused by Spark SQL or Parquet, or even maybe the Parquet file itself was damaged somehow. I'm investigating this. In the meanwhile, would you mind to help to narrow down the problem by trying to scan exactly the same Parquet file with some other systems (e.g. Hive or Impala)? If other systems work, then there must be something wrong with Spark SQL. Cheng On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood sadhan.s...@gmail.com mailto:sadhan.s...@gmail.com wrote: Hi Cheng, Thanks for your response. Here is the stack trace from yarn logs: Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 at java.util.ArrayList.elementData(ArrayList.java:418) at java.util.ArrayList.get(ArrayList.java:431) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80) at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:282) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96) at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193) ... 26 more On Sat, Nov 15, 2014 at 9:28 AM, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: Hi Sadhan, Could you please provide the stack trace of the |ArrayIndexOutOfBoundsException| (if any)? The reason why the first query succeeds is that Spark SQL doesn’t bother reading all data from the table to give |COUNT(*)|. In the second case, however, the whole table is asked to be cached lazily via the |cacheTable| call, thus it’s scanned to build the in-memory columnar cache. Then thing went wrong while scanning this LZO compressed Parquet file. But unfortunately the stack trace at hand doesn’t indicate the root cause. Cheng On 11/15/14 5:28 AM, Sadhan Sood wrote: While testing SparkSQL on a bunch of parquet files (basically used to be a partition for one of our hive tables), I encountered this error: import org.apache.spark.sql.SchemaRDD import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; val sqlContext = new org.apache.spark.sql.SQLContext(sc) val parquetFileRDD = sqlContext.parquetFile(parquetFile) parquetFileRDD.registerTempTable(xyz_20141109) sqlContext.sql(SELECT count(*) FROM xyz_20141109).collect() -- works fine sqlContext.cacheTable(xyz_20141109) sqlContext.sql(SELECT count(*) FROM xyz_20141109).collect() -- fails with an exception parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://::9000/event_logs/xyz/20141109/part-9359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-9.lzo.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) at org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:145) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388) at org.apache.spark.sql.columnar.InMemoryRelation$anonfun$3$anon$1.hasNext(InMemoryColumnarTableScan.scala:136) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at
Re: Help with Spark Streaming
Hi, Can anybody help me on this please, haven't been able to find the problem :( Thanks. On Nov 15, 2014 4:48 PM, Bahubali Jain bahub...@gmail.com wrote: Hi, Trying to use spark streaming, but I am struggling with word count :( I want consolidate output of the word count (not on a per window basis), so I am using updateStateByKey(), but for some reason this is not working. The function it self is not being invoked(do not see the sysout output on console). public final class WordCount { private static final Pattern SPACE = Pattern.compile( ); public static void main(String[] args) { if (args.length 2) { System.err.println(Usage: JavaNetworkWordCount hostname port); System.exit(1); } // Create the context with a 1 second batch size SparkConf sparkConf = new SparkConf().setAppName(JavaNetworkWordCount); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000)); ssc.checkpoint(/tmp/worcount); // Create a JavaReceiverInputDStream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') // Note that no duplication in storage level only for running locally. // Replication necessary in distributed scenario for fault tolerance. JavaReceiverInputDStreamString lines = ssc.socketTextStream( args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER); JavaDStreamString words = lines.flatMap(new FlatMapFunctionString, String() { @Override public IterableString call(String x) { return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStreamString, Integer wordCounts = words.mapToPair( new PairFunctionString, String, Integer() { @Override public Tuple2String, Integer call(String s) { System.err.println(Got +s); return new Tuple2String, Integer(s, 1); } }).reduceByKey(new Function2Integer, Integer, Integer() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); wordCounts.print(); *wordCounts.updateStateByKey(new updateFunction());* ssc.start(); ssc.awaitTermination(); } } class updateFunction implements Function2ListInteger, OptionalInteger, OptionalInteger { @Override public OptionalInteger call(ListInteger values, OptionalInteger state) { Integer x = new Integer(0); for (Integer i:values) x = x+i; Integer newSum = state.or(0)+x; // add the new values with the previous running count to get the new count System.out.println(Newsum is +newSum); return Optional.of(newSum); }; }
Re: Help with Spark Streaming
I guess, maybe you don’t need invoke reduceByKey() after mapToPair, because updateStateByKey had covered it. For your reference, here is a sample written by scala using text file stream instead of socket as below: object LocalStatefulWordCount extends App { val sparkConf = new SparkConf().setAppName(HdfsWordCount) val ssc = new StreamingContext(sparkConf, Seconds(2)) //must set checkpoint for updateStateByKey //note: checkpoint derectory can not be source directory ssc.checkpoint(./checkpoint) val updateFunc = (values: Seq[Int], state: Option[Int]) = { val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } val lines = ssc.textFileStream(/Users/twer/workspace/scala101/data) //local directory val wordDstream = lines.flatMap(_.split( )).map(x = (x, 1)) val statefulWordCount = wordDstream.updateStateByKey[Int](updateFunc) statefulWordCount.print() ssc.start() ssc.awaitTermination() } Zhang Yi / 张逸 Lead Consultant Email yizh...@thoughtworks.com (mailto:yizh...@thoughtworks.com) Telephone +86 15023157626 (mailto:+86 15023157626) Sent with Sparrow (http://www.sparrowmailapp.com/?sig) On Sunday, November 16, 2014 at 6:19 PM, Bahubali Jain wrote: Hi, Can anybody help me on this please, haven't been able to find the problem :( Thanks. On Nov 15, 2014 4:48 PM, Bahubali Jain bahub...@gmail.com (mailto:bahub...@gmail.com) wrote: Hi, Trying to use spark streaming, but I am struggling with word count :( I want consolidate output of the word count (not on a per window basis), so I am using updateStateByKey(), but for some reason this is not working. The function it self is not being invoked(do not see the sysout output on console). public final class WordCount { private static final Pattern SPACE = Pattern.compile( ); public static void main(String[] args) { if (args.length 2) { System.err.println(Usage: JavaNetworkWordCount hostname port); System.exit(1); } // Create the context with a 1 second batch size SparkConf sparkConf = new SparkConf().setAppName(JavaNetworkWordCount); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000)); ssc.checkpoint(/tmp/worcount); // Create a JavaReceiverInputDStream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') // Note that no duplication in storage level only for running locally. // Replication necessary in distributed scenario for fault tolerance. JavaReceiverInputDStreamString lines = ssc.socketTextStream( args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER); JavaDStreamString words = lines.flatMap(new FlatMapFunctionString, String() { @Override public IterableString call(String x) { return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStreamString, Integer wordCounts = words.mapToPair( new PairFunctionString, String, Integer() { @Override public Tuple2String, Integer call(String s) { System.err.println(Got +s); return new Tuple2String, Integer(s, 1); } }).reduceByKey(new Function2Integer, Integer, Integer() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); wordCounts.print(); wordCounts.updateStateByKey(new updateFunction()); ssc.start(); ssc.awaitTermination(); } } class updateFunction implements Function2ListInteger, OptionalInteger, OptionalInteger { @Override public OptionalInteger call(ListInteger values, OptionalInteger state) { Integer x = new Integer(0); for (Integer i:values) x = x+i; Integer newSum = state.or(0)+x; // add the new values with the previous running count to get the new count System.out.println(Newsum is +newSum); return Optional.of(newSum); }; }
Re: Using data in RDD to specify HDFS directory to write to
Can you check in the worker logs what exactly is happening!?? Thanks Best Regards On Sun, Nov 16, 2014 at 2:54 AM, jschindler john.schind...@utexas.edu wrote: UPDATE I have removed and added things systematically to the job and have figured that the inclusion of the construction of the SparkContext object is what is causing it to fail. The last run contained the code below. I keep losing executors apparently and I'm not sure why. Some of the relevant spark output is below, will add more on Monday as I must go participate in wknd activities. 14/11/15 14:53:43 INFO SparkDeploySchedulerBackend: Granted executor ID app-20141115145328-0025/3 on hostPort cloudera01.local.company.com:7078 with 8 cores, 512.0 MB RAM 14/11/15 14:53:43 INFO AppClient$ClientActor: Executor updated: app-20141115145328-0025/3 is now RUNNING 14/11/15 14:53:46 INFO MemoryStore: ensureFreeSpace(1063) called with curMem=1063, maxMem=309225062 14/11/15 14:53:46 INFO MemoryStore: Block input-0-1416084826000 stored as bytes to memory (size 1063.0 B, free 294.9 MB) 14/11/15 14:53:46 INFO BlockManagerInfo: Added input-0-1416084826000 in memory on cloudera01.local.company.com:49902 (size: 1063.0 B, free: 294.9 MB) 14/11/15 14:53:46 INFO BlockManagerMaster: Updated info of block input-0-1416084826000 14/11/15 14:53:46 WARN BlockManager: Block input-0-1416084826000 already exists on this machine; not re-adding it 14/11/15 14:53:46 INFO BlockGenerator: Pushed block input-0-1416084826000 14/11/15 14:53:46 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp:// sparkexecu...@cloudera01.local.company.com:52715/user/Executor#-1518587721 ] with ID 3 14/11/15 14:53:47 INFO BlockManagerInfo: Registering block manager cloudera01.local.company.com:46926 with 294.9 MB RAM 14/11/15 14:53:47 INFO SparkDeploySchedulerBackend: Executor 3 disconnected, so removing it 14/11/15 14:53:47 ERROR TaskSchedulerImpl: Lost an executor 3 (already removed): remote Akka client disassociated 14/11/15 14:53:47 INFO AppClient$ClientActor: Executor updated: app-20141115145328-0025/3 is now EXITED (Command exited with code 1) 14/11/15 14:53:47 INFO SparkDeploySchedulerBackend: Executor app-20141115145328-0025/3 removed: Command exited with code 1 14/11/15 14:53:47 INFO AppClient$ClientActor: Executor added: app-20141115145328-0025/4 on worker-20141114114152-cloudera01.local.company.com-7078 (cloudera01.local.company.com:7078) with 8 cores BLOCK 2 - last block before app fails: 14/11/15 14:54:15 INFO BlockManagerInfo: Registering block manager cloudera01.local.uship.com:34335 with 294.9 MB RAM 14/11/15 14:54:16 INFO SparkDeploySchedulerBackend: Executor 9 disconnected, so removing it 14/11/15 14:54:16 ERROR TaskSchedulerImpl: Lost an executor 9 (already removed): remote Akka client disassociated 14/11/15 14:54:16 INFO AppClient$ClientActor: Executor updated: app-20141115145328-0025/9 is now EXITED (Command exited with code 1) 14/11/15 14:54:16 INFO SparkDeploySchedulerBackend: Executor app-20141115145328-0025/9 removed: Command exited with code 1 14/11/15 14:54:16 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: Master removed our application: FAILED 14/11/15 14:54:16 ERROR TaskSchedulerImpl: Exiting due to error from cluster scheduler: Master removed our application: FAILED [hdfs@cloudera01 root]$ import kafka.producer._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka._ import org.apache.spark.SparkConf import org.apache.spark._ import org.json4s._ import org.json4s.native.JsonMethods._ import scala.collection.mutable.Map import scala.collection.mutable.MutableList case class Event(EventName: String, Payload: org.json4s.JValue) object App { def main(args: Array[String]) { val ssc = new StreamingContext(local[2], Data, Seconds(20)) ssc.checkpoint(checkpoint) val conf = new SparkConf().setMaster(spark://cloudera01.local.company.com:7077) val sc = new SparkContext(conf) val eventMap = scala.collection.immutable.Map(Events - 1) val pipe = KafkaUtils.createStream(ssc, dockerrepo,dockerrepo,dockerrepo, Cons1, eventMap).map(_._2) val eventStream = pipe.map(data = { parse(data) }).map(json = { implicit val formats = DefaultFormats val eventName = (json \ event).extractOpt[String] Event(eventName.getOrElse(*** NO EVENT NAME ***), json) }) eventStream.foreach(x = { var arr = x.toArray x.foreachPartition(y = { y.foreach(z = {print(z)}) }) }) ssc.start() ssc.awaitTermination() } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-data-in-RDD-to-specify-HDFS-directory-to-write-to-tp18789p19012.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Cancelled Key Exceptions on Massive Join
This usually happens when one of the worker is stuck on GC Pause and it times out. Enable the following configurations while creating sparkContext: sc.set(spark.rdd.compress,true) sc.set(spark.storage.memoryFraction,1) sc.set(spark.core.connection.ack.wait.timeout,6000) sc.set(spark.akka.frameSize,100) Thanks Best Regards On Sat, Nov 15, 2014 at 12:46 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Hello all. I have been running a Spark Job that eventually needs to do a large join. 24 million x 150 million A broadcast join is infeasible in this instance clearly, so I am instead attempting to do it with Hash Partitioning by defining a custom partitioner as: class RDD2Partitioner(partitions: Int) extends HashPartitioner(partitions) { override def getPartition(key: Any): Int = key match { case k: Tuple2[Int, String] = super.getPartition(k._1) case _ = super.getPartition(key) } } I then partition both arrays using this partitioner. However, the job eventually fails with the following exception which if I had to guess indicated that a network connection was interrupted during the shuffle stage, causing things to get lost and ultimately resulting in a fetch failure: 14/11/14 12:56:21 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(innovationdatanode08.cof.ds.capitalone.com,37590) 14/11/14 12:56:21 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@7369b398 14/11/14 12:56:21 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@7369b398 java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) In the spark UI, I still see a substantial amount of shuffling going on at this stage, I am wondering if I’m perhaps using the partitioner incorrectly? -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: User Authn and Authz in Spark missing ?
Have a look at this doc http://spark.apache.org/docs/latest/security.html You can configure your network to only accept connections from the trusted CIDR. If you are using Cloud services like Ec2/Azure/GCE etc, then it is straight forward from their web portal. If you are having a bunch of custom vps's then you might want to configure the iptable entries. Thanks Best Regards On Fri, Nov 14, 2014 at 9:20 PM, Zeeshan Ali Shah zas...@pdc.kth.se wrote: Hi, I am facing an issue as a Cloud Sysadmin , when Spark master launched on public IPs any one who knows the URL of spark can submit the jobs to it . Any way/hack to have a Authn and Authz in spark . i tried to look into it but could not find .. any hint ? -- Regards Zeeshan Ali Shah System Administrator - PDC HPC PhD researcher (IT security) Kungliga Tekniska Hogskolan +46 8 790 9115 http://www.pdc.kth.se/members/zashah
Re: Status of MLLib exporting models to PMML
Manish and others, A follow up question on my mind is whether there are protobuf (or other binary format) frameworks in the vein of PMML. Perhaps scientific data storage frameworks like netcdf, root are possible also. I like the comprehensiveness of PMML but as you mention the complexity of management for large models is a concern. Cheers On Fri, Nov 14, 2014 at 1:35 AM, Manish Amde manish...@gmail.com wrote: @Aris, we are closely following the PMML work that is going on and as Xiangrui mentioned, it might be easier to migrate models such as logistic regression and then migrate trees. Some of the models get fairly large (as pointed out by Sung Chung) with deep trees as building blocks and we might have to consider a distributed storage and prediction strategy. On Tuesday, November 11, 2014, Xiangrui Meng men...@gmail.com wrote: Vincenzo sent a PR and included k-means as an example. Sean is helping review it. PMML standard is quite large. So we may start with simple model export, like linear methods, then move forward to tree-based. -Xiangrui On Mon, Nov 10, 2014 at 11:27 AM, Aris arisofala...@gmail.com wrote: Hello Spark and MLLib folks, So a common problem in the real world of using machine learning is that some data analysis use tools like R, but the more data engineers out there will use more advanced systems like Spark MLLib or even Python Scikit Learn. In the real world, I want to have a system where multiple different modeling environments can learn from data / build models, represent the models in a common language, and then have a layer which just takes the model and run model.predict() all day long -- scores the models in other words. It looks like the project openscoring.io and jpmml-evaluator are some amazing systems for this, but they fundamentally use PMML as the model representation here. I have read some JIRA tickets that Xiangrui Meng is interested in getting PMML implemented to export MLLib models, is that happening? Further, would something like Manish Amde's boosted ensemble tree methods be representable in PMML? Thank you!! Aris - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- - Charles
RE: filtering a SchemaRDD
Indeed it did. Thanks! Ron From: Michael Armbrust [mailto:mich...@databricks.com] Sent: Friday, November 14, 2014 9:53 PM To: Daniel, Ronald (ELS-SDG) Cc: user@spark.apache.org Subject: Re: filtering a SchemaRDD If I use row[6] instead of row[text] I get what I am looking for. However, finding the right numeric index could be a pain. Can I access the fields in a Row of a SchemaRDD by name, so that I can map, filter, etc. without a trial and error process of finding the right int for the fieldname? row.text should work. More examples here: http://spark.apache.org/docs/1.1.0/sql-programming-guide.html#tab_python_2 Michael
Returning breeze.linalg.DenseMatrix from method
Hi, I have a method that returns DenseMatrix: def func(str: String): DenseMatrix = { ... ... } But I keep getting this error: *class DenseMatrix takes type parameters* I tried this too: def func(str: String): DenseMatrix(Int, Int, Array[Double]) = { ... ... } But this gives me this error: *'=' expected but '(' found* Any possible fixes?
Re: Communication between Driver and Executors
Hi, On Fri, Nov 14, 2014 at 3:20 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: I wonder if SparkConf is dynamically updated on all worker nodes or only during initialization. It can be used to piggyback information. Otherwise I guess you are stuck with Broadcast. Primarily I have had these issues moving legacy MR operators to Spark where MR piggybacks on Hadoop conf pretty heavily, in spark Native application its rarely required. Do you have a usecase like that? My usecase is http://apache-spark-user-list.1001560.n3.nabble.com/StreamingContext-does-not-stop-td18826.html – that is, notifying my Spark executors that the StreamingContext has been shut down. (Even with non-graceful shutdown, Spark doesn't seem to end the actual execution, just all the Spark-internal timers etc.) I need to do this properly or processing will go on for a very long time. I have been trying to mis-use broadcast as in - create a class with a boolean var, set to true - query this boolean on the executors as a prerequisite to process the next item - when I want to shutdown, I set the boolean to false and unpersist the broadcast variable (which will trigger re-delivery). This is very dirty, but it works with a local[*] master. Unfortunately, when deployed on YARN, the new value will never arrive at my executors. Any idea what could go wrong on YARN with this approach – or what is a good way to do this? Thanks Tobias
Interoperability between ScalaRDD, JavaRDD and PythonRDD
Hello, Is it possible to reuse RDD implementations written in Scala/Java with PySpark? Thanks, Nam - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
re: How to incrementally compile spark examples using mvn
Thank you Marcelo. I tried your suggestion (# mvn -pl :spark-examples_2.10 compile), but it required to download many spark components (as listed below), which I have already compiled on my server. Downloading: https://repo1.maven.org/maven2/org/apache/spark/spark-core_2.10/1.1.0/spark-core_2.10-1.1.0.pom ... Downloading: https://repo1.maven.org/maven2/org/apache/spark/spark-streaming_2.10/1.1.0/spark-streaming_2.10-1.1.0.pom ... Downloading: https://repository.jboss.org/nexus/content/repositories/releases/org/apache/spark/spark-hive_2.10/1.1.0/spark-hive_2.10-1.1.0.pom ... This problem didn't happen when I compiled the whole project using ``mvn -DskipTests package''. I guess some configurations have to be made to tell mvn the dependencies are local. Any idea for that? Thank you for your help! Cheers, Yiming -邮件原件- 发件人: Marcelo Vanzin [mailto:van...@cloudera.com] 发送时间: 2014年11月16日 10:26 收件人: sdi...@gmail.com 抄送: user@spark.apache.org 主题: Re: How to incrementally compile spark examples using mvn I haven't tried scala:cc, but you can ask maven to just build a particular sub-project. For example: mvn -pl :spark-examples_2.10 compile On Sat, Nov 15, 2014 at 5:31 PM, Yiming (John) Zhang sdi...@gmail.com wrote: Hi, I have already successfully compile and run spark examples. My problem is that if I make some modifications (e.g., on SparkPi.scala or LogQuery.scala) I have to use “mvn -DskipTests package” to rebuild the whole spark project and wait a relatively long time. I also tried “mvn scala:cc” as described in http://spark.apache.org/docs/latest/building-with-maven.html, but I could only get infinite stop like: [INFO] --- scala-maven-plugin:3.2.0:cc (default-cli) @ spark-parent --- [INFO] wait for files to compile... Is there any method to incrementally compile the examples using mvn? Thank you! Cheers, Yiming -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL exception on cached parquet table
Hi Cheng, I tried reading the parquet file(on which we were getting the exception) through parquet-tools and it is able to dump the file and I can read the metadata, etc. I also loaded the file through hive table and can run a table scan query on it as well. Let me know if I can do more to help resolve the problem, I'll run it through a debugger and see if I can get more information on it in the meantime. Thanks, Sadhan On Sun, Nov 16, 2014 at 4:35 AM, Cheng Lian lian.cs@gmail.com wrote: (Forgot to cc user mail list) On 11/16/14 4:59 PM, Cheng Lian wrote: Hey Sadhan, Thanks for the additional information, this is helpful. Seems that some Parquet internal contract was broken, but I'm not sure whether it's caused by Spark SQL or Parquet, or even maybe the Parquet file itself was damaged somehow. I'm investigating this. In the meanwhile, would you mind to help to narrow down the problem by trying to scan exactly the same Parquet file with some other systems (e.g. Hive or Impala)? If other systems work, then there must be something wrong with Spark SQL. Cheng On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood sadhan.s...@gmail.com wrote: Hi Cheng, Thanks for your response. Here is the stack trace from yarn logs: Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 at java.util.ArrayList.elementData(ArrayList.java:418) at java.util.ArrayList.get(ArrayList.java:431) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80) at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:282) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96) at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193) ... 26 more On Sat, Nov 15, 2014 at 9:28 AM, Cheng Lian lian.cs@gmail.com wrote: Hi Sadhan, Could you please provide the stack trace of the ArrayIndexOutOfBoundsException (if any)? The reason why the first query succeeds is that Spark SQL doesn’t bother reading all data from the table to give COUNT(*). In the second case, however, the whole table is asked to be cached lazily via the cacheTable call, thus it’s scanned to build the in-memory columnar cache. Then thing went wrong while scanning this LZO compressed Parquet file. But unfortunately the stack trace at hand doesn’t indicate the root cause. Cheng On 11/15/14 5:28 AM, Sadhan Sood wrote: While testing SparkSQL on a bunch of parquet files (basically used to be a partition for one of our hive tables), I encountered this error: import org.apache.spark.sql.SchemaRDD import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; val sqlContext = new org.apache.spark.sql.SQLContext(sc) val parquetFileRDD = sqlContext.parquetFile(parquetFile) parquetFileRDD.registerTempTable(xyz_20141109) sqlContext.sql(SELECT count(*) FROM xyz_20141109).collect() -- works fine sqlContext.cacheTable(xyz_20141109) sqlContext.sql(SELECT count(*) FROM xyz_20141109).collect() -- fails with an exception parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://::9000/event_logs/xyz/20141109/part-9359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-9.lzo.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) at org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:145) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388) at org.apache.spark.sql.columnar.InMemoryRelation$anonfun$3$anon$1.hasNext(InMemoryColumnarTableScan.scala:136) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at
Iterative changes to RDD and broadcast variables
Hi all, I'm iterating over an RDD (representing a distributed matrix...have to roll my own in Python) and making changes to different submatrices at each iteration. The loop structure looks something like: for i in range(x): VAR = sc.broadcast(i) rdd.map(func1).reduceByKey(func2) M = rdd.collect() where func1 and func2 use the current value of VAR for that iteration. Because there aren't any actions in the main loop, nothing actually happens until the collect method is called. I'm running into problems I can't diagnose (*extremely* long execution time for no particular reason, among others); is this code even valid? If not, how should make in-place iterative edits to different portions of a matrix, where each subsequent edit is dependent on the edits from the previous iteration? Thanks in advance! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Communication between Driver and Executors
Hi again, On Mon, Nov 17, 2014 at 8:16 AM, Tobias Pfeiffer t...@preferred.jp wrote: I have been trying to mis-use broadcast as in - create a class with a boolean var, set to true - query this boolean on the executors as a prerequisite to process the next item - when I want to shutdown, I set the boolean to false and unpersist the broadcast variable (which will trigger re-delivery). This is very dirty, but it works with a local[*] master. Unfortunately, when deployed on YARN, the new value will never arrive at my executors. In fact, it seems as if change mutable object (like mutable list) and unpersist in order to trigger redeploy only works locally. When running on YARN, even after an unpersist, the value will always be identical to what I shipped first. Now I wonder what unpersist actually does in that case. Must I call unpersist from an executor or from the driver? Thanks Tobias
RDD.aggregate versus accumulables...
Hi All. I am trying to get my head around why using accumulators and accumulables seems to be the most recommended method for accumulating running sums, averages, variances and the like, whereas the aggregate method seems to me to be the right one. I have no performance measurements as of yet, but it seems that aggregate is simpler and more intuitive (And it does what one might expect an accumulator to do) whereas the accumulators and accumulables seem to have some extra complications and overhead. So... What's the real difference between an accumulator/accumulable and aggregating an RDD? When is one method of aggregation preferred over the other? Thanks, Nate
Load json format dataset as RDD
Hi, I am new to spark. I met a problem when I intended to load one dataset. I have a dataset where the data is in json format and I'd like to load it as a RDD. As one record may span multiple lines, so SparkContext.textFile() is not doable. I also tried to use json4s to parse the json manually and then merge them into RDD one by one, but this solution is not convenient and low efficient. It seems that there is JsonRDD in SparkSQL, but it seems that it is for query only. Could any one provide me some suggestion about how to load json format data as RDD? For example, given the file path, load the dataset as RDD[JObject]. Thank you very much! Regards, J
Functions in Spark
Hi, Is there any way to know which of my functions perform better in Spark? In other words, say I have achieved same thing using two different implementations. How do I judge as to which implementation is better than the other. Is processing time the only metric that we can use to claim the goodness of one implementation to the other? Can anyone please share some thoughts on this? Thank You
Re: Functions in Spark
Check this video out: https://www.youtube.com/watch?v=dmL0N3qfSc8list=UURzsq7k4-kT-h3TDUBQ82-w On Mon, Nov 17, 2014 at 9:43 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, Is there any way to know which of my functions perform better in Spark? In other words, say I have achieved same thing using two different implementations. How do I judge as to which implementation is better than the other. Is processing time the only metric that we can use to claim the goodness of one implementation to the other? Can anyone please share some thoughts on this? Thank You
Re: Load json format dataset as RDD
|SQLContext.jsonFile| assumes one JSON record per line. Although I haven’t tried yet, it seems that this |JsonInputFormat| [1] can be helpful. You may read your original data set with |SparkContext.hadoopFile| and |JsonInputFormat|, then transform the resulted |RDD[String]| into a |JsonRDD| via |SQLContext.jsonRDD|. [1] http://pivotal-field-engineering.github.io/pmr-common/pmr/apidocs/com/gopivotal/mapreduce/lib/input/JsonInputFormat.html On 11/17/14 11:34 AM, J wrote: Hi, I am new to spark. I met a problem when I intended to load one dataset. I have a dataset where the data is in json format and I'd like to load it as a RDD. As one record may span multiple lines, so SparkContext.textFile() is not doable. I also tried to use json4s to parse the json manually and then merge them into RDD one by one, but this solution is not convenient and low efficient. It seems that there is JsonRDD in SparkSQL, but it seems that it is for query only. Could any one provide me some suggestion about how to load json format data as RDD? For example, given the file path, load the dataset as RDD[JObject]. Thank you very much! Regards, J
Questions Regarding to MPI Program Migration to Spark
Guys, Recently we are migrating our backend pipeline from to Spark. In our pipeline, we have a MPI-based HAC implementation, to ensure the result consistency of migration, we also want to migrate this MPI-implemented code to Spark. However, during the migration process, I found that there are some possible limitation with Spark. In the original MPI implementation, the logic looks like the following: Node 0( master node ) Get the complete document data, store in g_doc_data Get the document sub-set for which this node needs to calculate the distance metrics, store in l_dist_metric_data while ( exit condition is not met ) { Find the locally closed node pair, notated as l_closest_pair Get the globally closed node pair from other nodes via MPI's MPI_AllReduce, notated as g_closest_pair Merge the globally closed node pair and update the document data g_doc_data. Re-calculate the distance metrics for those node pair which will be impacted by the above merge operations, update l_dist_metric_data. } Node 1/2/.../P ( slave nodes ) Get the complete document data, store in g_doc_data Get the document sub-set for which this node needs to calculate the distance metrics, store in l_dist_metric_data while ( exit condition is not met ) { Find the locally closed node pair, notated as l_closest_pair Get the globally closed node pair from other nodes via MPI's MPI_AllReduce, notated as g_closest_pair Merge the globally closed node pair and update the document data g_doc_data. Re-calculate the distance metrics for those node pair which will be impacted by the above merge operations, update l_dist_metric_data. } The essential difficulty for migrating the above logic to Spark is: In the original implementation, between each iteration, the computation nodes need to hold the local state( which is g_doc_data and l_dist_metric_data ). And in Spark, it looks that there isn't any effective ways for keeping intermediate local state between iterations. Usually in Spark, we use either broadcast variable or closure to pass state to the operations of each iterations. Of course, after each iteration, we could summarize the change effects from all the worker nodes via reduce and then broadcast this summarization effect to them back again. But this operation will involve a significant data transfer, when the data size is large ( e.g. 100 thousands documents with 500 dimension feature vectors ), and the performance penalty is non-neglectable. So my question is: 1. Is the difficulty I mentioned above is the limitations imposed by the computation paradigm of Spark? 2. Is there any possible ways for implementing the bottom-up agglomeration hierarchical clustering algorithms in Spark? BTW, I know there are some top-down divisive hierarchical clustering algorithm in the upcoming 1.2 release, I will also give them a try. Thanks. -- yangjun...@gmail.com http://hi.baidu.com/yjpro
Re: Functions in Spark
Thanks I did go through the video it was very informative, but I think I's looking for the Transformations section @ page https://spark.apache.org/docs/0.9.1/scala-programming-guide.html. On Mon, Nov 17, 2014 at 10:31 AM, Samarth Mailinglist mailinglistsama...@gmail.com wrote: Check this video out: https://www.youtube.com/watch?v=dmL0N3qfSc8list=UURzsq7k4-kT-h3TDUBQ82-w On Mon, Nov 17, 2014 at 9:43 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, Is there any way to know which of my functions perform better in Spark? In other words, say I have achieved same thing using two different implementations. How do I judge as to which implementation is better than the other. Is processing time the only metric that we can use to claim the goodness of one implementation to the other? Can anyone please share some thoughts on this? Thank You -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
Re: spark-submit question
You are changing these paths and filenames to match your own actual scripts and file locations right? On Nov 17, 2014 4:59 AM, Samarth Mailinglist mailinglistsama...@gmail.com wrote: I am trying to run a job written in python with the following command: bin/spark-submit --master spark://localhost:7077 /path/spark_solution_basic.py --py-files /path/*.py --files /path/config.properties I always get an exception that config.properties is not found: INFO - IOError: [Errno 2] No such file or directory: 'config.properties' Why isn't this working?