Re: Optimizing reduce for 'huge' aggregated outputs.
Can you key your RDD by some key and use reduceByKey? In fact if you are merging bunch of maps you can create a set of (k, v) in your mapPartitions and then reduceByKey using some merge function. The reduce will happen in parallel on multiple nodes in this case. You'll end up with just a single set of k, v per partition which you can reduce or collect and merge on the driver. — Sent from Mailbox On Tue, Jun 10, 2014 at 1:05 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: I suppose what I want is the memory efficiency of toLocalIterator and the speed of collect. Is there any such thing? On Mon, Jun 9, 2014 at 3:19 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Hello, I noticed that the final reduce function happens in the driver node with a code that looks like the following. val outputMap = mapPartition(domsomething).reduce(a: Map, b: Map) { a.merge(b) } although individual outputs from mappers are small. Over time the aggregated result outputMap could be huuuge (say with hundreds of millions of keys and values, reaching giga bytes). I noticed that, even if we have a lot of memory in the driver node, this process becomes realy slow eventually (say we have 100+ partitions. the first reduce is fast, but progressively, it becomes veeery slow as more and more partition outputs get aggregated). Is this because the intermediate reduce output gets serialized and then deserialized every time? What I'd like ideally is, since reduce is taking place in the same machine any way, there's no need for any serialization and deserialization, and just aggregate the incoming results into the final aggregation. Is this possible?
Re: Optimizing reduce for 'huge' aggregated outputs.
Hi Nick, How does reduce work? I thought after reducing in the executor, it will reduce in parallel between multiple executors instead of pulling everything to driver and reducing there. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Mon, Jun 9, 2014 at 11:07 PM, Nick Pentreath nick.pentre...@gmail.com wrote: Can you key your RDD by some key and use reduceByKey? In fact if you are merging bunch of maps you can create a set of (k, v) in your mapPartitions and then reduceByKey using some merge function. The reduce will happen in parallel on multiple nodes in this case. You'll end up with just a single set of k, v per partition which you can reduce or collect and merge on the driver. — Sent from Mailbox On Tue, Jun 10, 2014 at 1:05 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: I suppose what I want is the memory efficiency of toLocalIterator and the speed of collect. Is there any such thing? On Mon, Jun 9, 2014 at 3:19 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Hello, I noticed that the final reduce function happens in the driver node with a code that looks like the following. val outputMap = mapPartition(domsomething).reduce(a: Map, b: Map) { a.merge(b) } although individual outputs from mappers are small. Over time the aggregated result outputMap could be huuuge (say with hundreds of millions of keys and values, reaching giga bytes). I noticed that, even if we have a lot of memory in the driver node, this process becomes realy slow eventually (say we have 100+ partitions. the first reduce is fast, but progressively, it becomes veeery slow as more and more partition outputs get aggregated). Is this because the intermediate reduce output gets serialized and then deserialized every time? What I'd like ideally is, since reduce is taking place in the same machine any way, there's no need for any serialization and deserialization, and just aggregate the incoming results into the final aggregation. Is this possible?
Writing data to HBase using Spark
Hi, I am reading data from a HBase table to RDD and then using foreach on that RDD I am doing some processing on every Result of HBase table. After this processing I want to store the processed data back to another HBase table. How can I do that ? If I use standard Hadoop and HBase classes to write data to HBase I fall into serialization issues. How should I write data to HBase in this case? Thanks, -Vibhor
Re: Writing data to HBase using Spark
Please see sample code attached at https://issues.apache.org/jira/browse/SPARK-944. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Writing-data-to-HBase-using-Spark-tp7304p7305.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Shark over Spark-Streaming
Is it possible to use Shark over Streaming data? I did not find any mention of that on the website. When you run shark it gives you a shell to run your queries for stored data. Is there any way to do the same over streaming data? -- Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shark-over-Spark-Streaming-tp7307.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: Is Spark-1.0.0 not backward compatible with Shark-0.9.1 ?
And if you want to use the SQL CLI (based on catalyst) as it works in Shark, you can also check out https://github.com/amplab/shark/pull/337 :) This preview version doesn’t require the Hive to be setup in the cluster. (Don’t forget to put the hive-site.xml under SHARK_HOME/conf also) Cheng Hao From: Michael Armbrust [mailto:mich...@databricks.com] Sent: Saturday, June 07, 2014 2:22 AM To: user@spark.apache.org Subject: Re: Is Spark-1.0.0 not backward compatible with Shark-0.9.1 ? There is not an official updated version of Shark for Spark-1.0 (though you might check out the untested spark-1.0 branch on the github). You can also check out the preview release of Shark that runs on Spark SQL: https://github.com/amplab/shark/tree/sparkSql Michael On Fri, Jun 6, 2014 at 6:02 AM, bijoy deb bijoy.comput...@gmail.commailto:bijoy.comput...@gmail.com wrote: Hi, I am trying to run build Shark-0.9.1 from source,with Spark-1.0.0 as its dependency,using sbt package command.But I am getting the below error during build,which is making me think that perhaps Spark-1.0.0 is not compatible with Shark-0.9.1: [info] Compilation completed in 9.046 s [error] /vol1/shark/src/main/scala/shark/api/JavaTableRDD.scala:57: org.apache.spark.api.java.function.Function[shark.api.Row,Boolean] does not take parameters [error] wrapRDD(rdd.filter((x = f(x).booleanValue( [error] ^ [error] /vol1/shark/src/main/scala/shark/execution/CoGroupedRDD.scala:84: type mismatch; [error] found : String [error] required: org.apache.spark.serializer.Serializer [error] new ShuffleDependency[Any, Any](rdd, part, SharkEnv.shuffleSerializerName) [error] ^ [error] /vol1/shark/src/main/scala/shark/execution/CoGroupedRDD.scala:120: value serializerManager is not a member of org.apache.spark.SparkEnv [error] val serializer = SparkEnv.get.serializerManager.get(SharkEnv.shuffleSerializerName, SparkEnv.get.conf) [error] ^ [warn] /vol1/shark/src/main/scala/shark/execution/ExtractOperator.scala:111: non-variable type argument (shark.execution.ReduceKey, Any) in type pattern org.apache.spark.rdd.RDD[(shark.execution.ReduceKey, Any)] is unchecked since it is eliminated by erasure [warn] case r: RDD[(ReduceKey, Any)] = RDDUtils.sortByKey(r) [warn] ^ [error] /vol1/shark/src/main/scala/shark/execution/GroupByPostShuffleOperator.scala:204: type mismatch; [error] found : String [error] required: org.apache.spark.serializer.Serializer [error] .setSerializer(SharkEnv.shuffleSerializerName) [error] ^ . ... Can you please suggest if there is any way to use the Shark with the new Spark-1.0.0 version? Thanks Bijoy
Re: Spark 1.0.0 Maven dependencies problems.
Thanks for the hint. I removed signature info from same jar and JVM is happy now. But problem remains, several same jar's but different versions, not good. Spark itself is very, very promising, I am very excited Thank you all toivo -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-Maven-dependencies-problems-tp7247p7309.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Problem in Spark Streaming
I am running a spark streaming job to count top 10 hashtags over last 5 mins window, querying every 1 sec. It is taking approx 1.4 sec (end-to-end-delay) to answer most of the query but there are few instances in between when it takes considerable more amount of time (like around 15 sec) due to which the response time of further queries also becomes more. I am not able to debug the reason for such spikes in between. The data rate is nearly constant, so this spike is not due to sudden increase in the data rate. Also is there any way so that I can fix a bound on time taken by a particular query. Like if a particular query takes more than say 2 sec then it should kill that query and move on to the next query. So that if a particular query takes more time then it do not effect future queries. Thanx, Nilesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-Spark-Streaming-tp7310.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Problem in Spark Streaming
Hi Nilmish, I confront the same problem. I am wondering how do you measure the latency? Regards, Yingjun -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-Spark-Streaming-tp7310p7311.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Problem in Spark Streaming
You can measure the latency from the logs. Search for words like Total delay in the logs. This denotes the total end to end delay for a particular query. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-Spark-Streaming-tp7310p7312.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
pmml with augustus
hello guys, has anybody experiances with the library augustus as a serializer for scoring models? looks very promising and i even found a hint on the connection augustus and spark all the best -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pmml-with-augustus-tp7313.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: pmml with augustus
It's worth mentioning that Augustus is a Python-based library. On a related note, in Java-land, I have had good experiences with jpmml's projects: On Tue, Jun 10, 2014 at 7:52 AM, filipus floe...@gmail.com wrote: hello guys, has anybody experiances with the library augustus as a serializer for scoring models? looks very promising and i even found a hint on the connection augustus and spark all the best -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pmml-with-augustus-tp7313.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Performance of Akka or TCP Socket input sources vs HDFS: Data locality in Spark Streaming
Hello! Spark Streaming supports HDFS as input source, and also Akka actor receivers, or TCP socket receivers. For my use case I think it's probably more convenient to read the data directly from Actors, because I already need to set up a multi-node Akka cluster (on the same nodes that Spark runs on) and write some actors to perform some parallel operations. Writing actor receivers to consume the results of my business-logic actors and then feed into Spark is pretty seamless. Note that the actors generate a large amount of data (a few GBs to tens of GBs). The other option would be to setup HDFS on the same cluster as Spark, write the data from the Actors to HDFS, and then use HDFS as input source for Spark Streaming. Does this result in better performance due to data locality (with HDFS data replication turned on)? I think performance should be almost the same with actors, since Spark workers local to the worker actors should get the data fast, and some optimization like this is definitely done I assume? I suppose the only benefit with HDFS would be better fault tolerance, and the ability to checkpoint and recover even if master fails. Cheers, Nilesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Performance-of-Akka-or-TCP-Socket-input-sources-vs-HDFS-Data-locality-in-Spark-Streaming-tp7317.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Calling JavaPairRDD.first after calling JavaPairRDD.groupByKey results in NullPointerException
I am getting a strange null pointer exception when trying to list the first entry of a JavaPairRDD after calling groupByKey on it. Following is my code: JavaPairRDDTuple3lt;String, String, String, ListString KeyToAppList = KeyToApp.distinct().groupByKey(); // System.out.println(First member of the key-val list: + KeyToAppList.first()); // Above call to .first causes a null pointer exception JavaRDDInteger KeyToAppCount = KeyToAppList.map( new FunctionTuple2lt;Tuple3lt;String, String, String, ListString, Integer() { @Override public Integer call(Tuple2Tuple3lt;String, String, String, ListString tupleOfTupAndList) throws Exception { ListString apps = tupleOfTupAndList._2; SetString uniqueApps = new HashSetString(apps); return uniqueApps.size(); } }); System.out.println(First member of the key-val list: + KeyToAppCount.first()); // Above call to .first prints the first element all right. The first call to JavaPairRDD results in a null pointer exception. However, if I comment out the call to JavaPairRDD.first(), and instead proceed onto applying the map function, the call to JavaPairRDD.first() doesn't raise any exception. Why the null pointer exception immediately after applying groupByKey? The null pointer exception looks like follows: Exception in thread main org.apache.spark.SparkException: Job aborted: Exception while deserializing and fetching task: java.lang.NullPointerException at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Calling-JavaPairRDD-first-after-calling-JavaPairRDD-groupByKey-results-in-NullPointerException-tp7318.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Performance of Akka or TCP Socket input sources vs HDFS: Data locality in Spark Streaming
Hey Nilesh, Great to hear your using Spark Streaming, in my opinion the crux of your question comes down to what you want to do with the data in the future and/or if there is utility it using it from more than one Spark/Streaming job. 1). *One-time-use fire and forget *- as you rightly point out, hooking up to the Akka actors makes sense if the usefulness of the data is short-lived and you don't need the ability to readily go back into archived data. 2). *Fault tolerance multiple uses* - consider using a message queue like Apache Kafka [1], write messages from your Akka Actors into a Kafka topic with multiple partitions and replication. Then use Spark Streaming job(s) to read from Kafka. You can tune Kafka to keep the last *N* days data online so if your Spark Streaming job dies it can pickup at the point it left off. 3). *Keep indefinitely* - files in HDFS, 'nuff said. We're currently using (2) Kafka (3) HDFS to process around 400M web clickstream events a week. Everything is written into Kafka and kept 'online' for 7 days, and also written out to HDFS in compressed date-sequential files. We use several Spark Streaming jobs to process the real-time events straight from Kafka. Kafka supports multiple consumers so each job sees his own view of the message queue and all its events. If any of the Streaming jobs die or are restarted they continue consuming from Kafka from the last processed message without effecting any of the other consumer processes. Best, MC [1] http://kafka.apache.org/ On 10 June 2014 13:05, Nilesh Chakraborty nil...@nileshc.com wrote: Hello! Spark Streaming supports HDFS as input source, and also Akka actor receivers, or TCP socket receivers. For my use case I think it's probably more convenient to read the data directly from Actors, because I already need to set up a multi-node Akka cluster (on the same nodes that Spark runs on) and write some actors to perform some parallel operations. Writing actor receivers to consume the results of my business-logic actors and then feed into Spark is pretty seamless. Note that the actors generate a large amount of data (a few GBs to tens of GBs). The other option would be to setup HDFS on the same cluster as Spark, write the data from the Actors to HDFS, and then use HDFS as input source for Spark Streaming. Does this result in better performance due to data locality (with HDFS data replication turned on)? I think performance should be almost the same with actors, since Spark workers local to the worker actors should get the data fast, and some optimization like this is definitely done I assume? I suppose the only benefit with HDFS would be better fault tolerance, and the ability to checkpoint and recover even if master fails. Cheers, Nilesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Performance-of-Akka-or-TCP-Socket-input-sources-vs-HDFS-Data-locality-in-Spark-Streaming-tp7317.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Problem in Spark Streaming
Hi Nilmish, What's the data rate/node when you see the high latency? (It seems the latency keeps increasing.) Do you still see it if you lower the data rate or the frequency of the windowed query? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-Spark-Streaming-tp7310p7321.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Problem in Spark Streaming
How can I measure data rate/node ? I am feeding the data through kafka API. I only know the total inflow data rate which almost remains constant . How can I figure out what amount of data is distributed to the nodes in my cluster ? Latency does not keep on increasing infinetly. It goes up for some instant and then it drops down again to the normal level. I want to get away with these spikes in between. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-Spark-Streaming-tp7310p7325.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: abnormal latency when running Spark Streaming
Hi Yingjun, Do you see a stable latency or the latency keeps increasing? And could you provide some details about the input data rate/node, batch interval, windowDuration and slideDuration when you see the high latency? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/abnormal-latency-when-running-Spark-Streaming-tp7315p7324.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark Streaming socketTextStream
Good morning, I have taken the socketTextStream example and instead of running on a local Spark instance, I have pushed it to my Spark cluster in AWS (1 master with 5 slave nodes). I am getting the following error that appears to indicate that all the slaves are trying to read from localhost: when all I really want is the single master node to read from it's localhost: and batch up what it receives. Can anyone help me with what I might be missing with the way I am submitting the job? 14/06/10 13:12:49 INFO scheduler.ReceiverTracker: Registered receiver for stream 0 from akka.tcp://spark@SLAVE-INTERNAL-IP:39710 14/06/10 13:12:49 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost: - java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at java.net.Socket.connect(Socket.java:528) at java.net.Socket.init(Socket.java:425) at java.net.Socket.init(Socket.java:208) at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:71) at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:57) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-socketTextStream-tp7326.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Problem in Spark Streaming
Oh, I mean the average data rate/node. But in case I want to know the input activities to each node (I use a custom receiver instead of Kafka), I usually search these records in logs to get a sense: BlockManagerInfo: Added input ... on [hostname:port] (size: xxx KB) I also see some spikes in latency as I posted earlier: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-achieve-reasonable-performance-on-Spark-Streaming-tp7262.html It's even worse as the spikes cause the latency to increase infinitely when the data rate is a little high, although the machines are underutilized. I can't explain it either. I'm not sure if the cause is the same as yours. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-Spark-Streaming-tp7310p7327.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming socketTextStream
You can use the master's IP address (Or whichever machine you chose to run the nc command) instead of localhost.
Re: Spark Streaming socketTextStream
Worked! Thanks so much! Fred Fred Wolfinger Research Staff Member, CyberPoint Labs direct +1 410 779 6741 mobile +1 443 655 3322 CyberPoint International 621 East Pratt Street, Suite 300 Baltimore MD 21202-3140 phone +1 410 779 6700 www.cyberpointllc.com http://www.cyberpointllc.com/ If you believe you received this e-mail in error, please notify the sender immediately, delete the e-mail from your computer and do not copy or disclose it to anyone else. The information in this email constitutes the proprietary information of Cyber Point International, LLC (DBA CyberPoint), and should be accessed only by the individual to whom it is addressed. The information in this email and any attachments may not be used, copied or disclosed without the consent of CyberPoint. CyberPoint is not responsible for any damages caused by your unauthorized use of the materials in this email. From: Akhil Das-2 [via Apache Spark User List] ml-node+s1001560n7328...@n3.nabble.com Date: Tuesday, June 10, 2014 10:16 AM To: Fred Wolfinger fwolfin...@cyberpointllc.com Subject: Re: Spark Streaming socketTextStream You can use the master's IP address (Or whichever machine you chose to run the nc command) instead of localhost. If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-socketTe xtStream-tp7326p7328.html To unsubscribe from Spark Streaming socketTextStream, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jt p?macro=unsubscribe_by_codenode=7326code=ZndvbGZpbmdlckBjeWJlcnBvaW50bGxjL mNvbXw3MzI2fC0xMDE2NjE2NjAy . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jt p?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml. namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.vi ew.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemai l.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aem ail.naml smime.p7s (6K) http://apache-spark-user-list.1001560.n3.nabble.com/attachment/7330/0/smime.p7s -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-socketTextStream-tp7326p7330.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: pmml with augustus
I should point out that if you don't want to take a polyglot approach to languages and reside solely in the JVM, then you can just use plain old java serialization on the Model objects that come out of MLlib's APIs from Java or Scala and load them up in another process and call the relevant .predict() method when it comes time to serve. The same approach would probably also work for models trained via MLlib's python APIs, but I haven't tried that. Native PMML serialization would be a nice feature to add to MLlib as a mechanism to transfer models to other environments for further analysis/serving. There's a JIRA discussion about this here: https://issues.apache.org/jira/browse/SPARK-1406 On Tue, Jun 10, 2014 at 10:53 AM, filipus floe...@gmail.com wrote: Thank you very much the cascading project i didn't recognize it at all till now this project is very interesting also I got the idea of the usage of scala as a language for spark - becuase i can intergrate jvm based libraries very easy/naturaly when I got it right mh... but I could also use sparc as a model engine, augustus for the serializer and a third party produkt for the prediction engine like using jpmml mh... got the feeling that i need to do java, scala and python at the same time... first things first - augustus for an pmml output from spark :-) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pmml-with-augustus-tp7313p7335.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
HDFS Server/Client IPC version mismatch while trying to access HDFS files using Spark-0.9.1
Hi all, I have build Shark-0.9.1 using sbt using the below command: *SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.6.0 sbt/sbt assembly* My Hadoop cluster is also having version 2.0.0-mr1-cdh4.6.0. But when I try to execute the below command from Spark shell,which reads a file from HDFS, I get the IPC version mismatch- IPC version 7 on server versus IPC version 4 on client error on org.apache.hadoop.hdfs.DFSClient class. *scala val s = sc.textFile(hdfs://host:port/test.txt)scala s.count()14/06/10 23:42:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable14/06/10 23:42:59 WARN snappy.LoadSnappy: Snappy native library not loadedorg.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot communicate with client version 4at org.apache.hadoop.ipc.Client.call(Client.java:1070)at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)at com.sun.proxy.$Proxy9.getProtocolVersion(Unknown Source)at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)* at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:238) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:203) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208) Apparently this error is because of version mismatch of the hadoop-hdfs jar between client (one referred by Spark) and server(hadoop cluster).But what I don't understand is why is this mismatch (since I had built Spark with the correct Hadoop version). Any suggestions would be highly appreciated. Thanks Bijoy
Re: Can't find pyspark when using PySpark on YARN
Hi Qi Ping, You don't have to distribute these files; they are automatically packaged in the assembly jar, which is already shipped to the worker nodes. Other people have run into the same issue. See if the instructions here are of any help: http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3ccamjob8mr1+ias-sldz_rfrke_na2uubnmhrac4nukqyqnun...@mail.gmail.com%3e As described in the link, the last resort is to try building your assembly jar with JAVA_HOME set to Java 6. This usually fixes the problem (more details in the link provided). Cheers, Andrew 2014-06-10 6:35 GMT-07:00 李奇平 qiping@alibaba-inc.com: Dear all, When I submit a pyspark application using this command: ./bin/spark-submit --master yarn-client examples/src/main/python/wordcount.py hdfs://... I get the following exception: Error from python worker: Traceback (most recent call last): File /usr/ali/lib/python2.5/runpy.py, line 85, in run_module loader = get_loader(mod_name) File /usr/ali/lib/python2.5/pkgutil.py, line 456, in get_loader return find_loader(fullname) File /usr/ali/lib/python2.5/pkgutil.py, line 466, in find_loader for importer in iter_importers(fullname): File /usr/ali/lib/python2.5/pkgutil.py, line 422, in iter_importers __import__(pkg) ImportError: No module named pyspark PYTHONPATH was: /home/xxx/spark/python:/home/xxx/spark_on_yarn/python/lib/py4j-0.8.1-src.zip:/disk11/mapred/tmp/usercache//filecache/11/spark-assembly-1.0.0-hadoop2.0.0-ydh2.0.0.jar Maybe `pyspark/python` and `py4j-0.8.1-src.zip` is not included in the YARN worker, How can I distribute these files with my application? Can I use `--pyfiles python.zip, py4j-0.8.1-src.zip `? Or how can I package modules in pyspark to a .egg file?
Re: Spark Logging
Event logs are different from writing using a logger, like log4j. The event logs are the type of data showing up in the history server. For my team, we use com.typesafe.scalalogging.slf4j.Logging. Our logs show up in /etc/spark/work/app-id/executor-id/stderr and stdout. All of our logging seems to show up in stderr. -Suren On Tue, Jun 10, 2014 at 2:56 PM, coderxiang shuoxiang...@gmail.com wrote: By default, the logs are available at `/tmp/spark-events`. You can specify the log directory via spark.eventLog.dir, see this configuration page http://spark.apache.org/docs/latest/configuration.html . -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Logging-tp7340p7343.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
getting started with mllib.recommendation.ALS
Question on the input and output for ALS.train() and MatrixFactorizationModel.predict(). My input is list of Ratings(user_id, product_id, rating) and my ratings are one a scale of 1-5 (inclusive). When I compute predictions over the superset of all (user_id, product_id) pairs, the ratings produced are on a different scale. The question is this: do I need to normalize the data coming out of predict() to my own scale or does the input need to be different? Thanks!
Re: NoSuchMethodError in KafkaReciever
Hi, I have the same problem when running Kafka to Spark Streaming pipeline from Java with explicitely specified message decoders. I had thought, that it was related to Eclipse environment, as suggested here, but it's not the case. I have coded an example based on class: https://github.com/apache/spark/blob/branch-0.9/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java and have builded shaded uber jar with all the deps and tried to run it from command line. When I use the createStream method from the example class like this: KafkaUtils.createStream(jssc, zookeeper:port, test, topicMap); everything is working fine, but when I explicitely specify message decoder classes used in this method with another overloaded createStream method: KafkaUtils.createStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, props, topicMap, StorageLevels.MEMORY_AND_DISK_2); the applications stops with an error: 14/06/10 22:28:06 ERROR kafka.KafkaReceiver: Error receiving data java.lang.NoSuchMethodException: java.lang.Object.init(kafka.utils.VerifiableProperties) at java.lang.Class.getConstructor0(Unknown Source) at java.lang.Class.getConstructor(Unknown Source) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:108) at org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:126) I have tried Spark versions 0.9.0-incubating, 0.9.0 and 0.9.1, but the error occurs everywhere. Kafka StringDecoder class has the constructor with VerifiableProperties parameter and all required classes are in the same uber jar, so it is strange that scala/java cannot find it with reflection api. Maybe there is some problem with Manifest/ClassTag usage in KafkaUtils or KafkaInputDStream classes, but I'm not a Scala expert and cannot be sure about it. The problematic code is the same from version 0.9 to the current one, so it's still there. Unit test from the Spark project is working fine with every KafkaUtils method, because the test does not try to register the kafka stream, only checks the interface. Currently it is possible to use Kafka to Spark Streaming pipeline from Java only with the default String message decoders, which makes this tool almost useless (unless you are a great JSON fan). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-in-KafkaReciever-tp2209p7347.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: NoSuchMethodError in KafkaReciever
I had this same problem as well. I ended up just adding the necessary code in KafkaUtil and compiling my own spark jar. Something like this for the raw stream: def createRawStream( jssc: JavaStreamingContext, kafkaParams: JMap[String, String], topics: JMap[String, JInt] ): JavaPairDStream[Array[Byte], Array[Byte]] = { new KafkaInputDStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), StorageLevel.MEMORY_AND_DISK_SER_2) } On Tue, Jun 10, 2014 at 2:15 PM, mpieck mpi...@gazeta.pl wrote: Hi, I have the same problem when running Kafka to Spark Streaming pipeline from Java with explicitely specified message decoders. I had thought, that it was related to Eclipse environment, as suggested here, but it's not the case. I have coded an example based on class: https://github.com/apache/spark/blob/branch-0.9/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java and have builded shaded uber jar with all the deps and tried to run it from command line. When I use the createStream method from the example class like this: KafkaUtils.createStream(jssc, zookeeper:port, test, topicMap); everything is working fine, but when I explicitely specify message decoder classes used in this method with another overloaded createStream method: KafkaUtils.createStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, props, topicMap, StorageLevels.MEMORY_AND_DISK_2); the applications stops with an error: 14/06/10 22:28:06 ERROR kafka.KafkaReceiver: Error receiving data java.lang.NoSuchMethodException: java.lang.Object.init(kafka.utils.VerifiableProperties) at java.lang.Class.getConstructor0(Unknown Source) at java.lang.Class.getConstructor(Unknown Source) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:108) at org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:126) I have tried Spark versions 0.9.0-incubating, 0.9.0 and 0.9.1, but the error occurs everywhere. Kafka StringDecoder class has the constructor with VerifiableProperties parameter and all required classes are in the same uber jar, so it is strange that scala/java cannot find it with reflection api. Maybe there is some problem with Manifest/ClassTag usage in KafkaUtils or KafkaInputDStream classes, but I'm not a Scala expert and cannot be sure about it. The problematic code is the same from version 0.9 to the current one, so it's still there. Unit test from the Spark project is working fine with every KafkaUtils method, because the test does not try to register the kafka stream, only checks the interface. Currently it is possible to use Kafka to Spark Streaming pipeline from Java only with the default String message decoders, which makes this tool almost useless (unless you are a great JSON fan). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-in-KafkaReciever-tp2209p7347.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: getting started with mllib.recommendation.ALS
For trainImplicit(), the output is an approximation of a matrix of 0s and 1s, so the values are generally (not always) in [0,1] But for train(), you should be predicting the original input matrix as-is, as I understand. You should get output in about the same range as the input but again not necessarily 1-5. If it's really different, you could be underfitting. Try less lambda, more features? On Tue, Jun 10, 2014 at 4:59 PM, Sandeep Parikh sand...@clusterbeep.org wrote: Question on the input and output for ALS.train() and MatrixFactorizationModel.predict(). My input is list of Ratings(user_id, product_id, rating) and my ratings are one a scale of 1-5 (inclusive). When I compute predictions over the superset of all (user_id, product_id) pairs, the ratings produced are on a different scale. The question is this: do I need to normalize the data coming out of predict() to my own scale or does the input need to be different? Thanks!
Re: NoSuchMethodError in KafkaReciever
I added https://issues.apache.org/jira/browse/SPARK-2103 to track this. I also ran into it. I don't have a fix, but, somehow I think someone with more understanding of Scala and Manifest objects might see the easy fix. On Tue, Jun 10, 2014 at 5:15 PM, mpieck mpi...@gazeta.pl wrote: Hi, I have the same problem when running Kafka to Spark Streaming pipeline from Java with explicitely specified message decoders. I had thought, that it was related to Eclipse environment, as suggested here, but it's not the case. I have coded an example based on class: https://github.com/apache/spark/blob/branch-0.9/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java and have builded shaded uber jar with all the deps and tried to run it from command line. When I use the createStream method from the example class like this: KafkaUtils.createStream(jssc, zookeeper:port, test, topicMap); everything is working fine, but when I explicitely specify message decoder classes used in this method with another overloaded createStream method: KafkaUtils.createStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, props, topicMap, StorageLevels.MEMORY_AND_DISK_2); the applications stops with an error: 14/06/10 22:28:06 ERROR kafka.KafkaReceiver: Error receiving data java.lang.NoSuchMethodException: java.lang.Object.init(kafka.utils.VerifiableProperties) at java.lang.Class.getConstructor0(Unknown Source) at java.lang.Class.getConstructor(Unknown Source) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:108) at org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:126) I have tried Spark versions 0.9.0-incubating, 0.9.0 and 0.9.1, but the error occurs everywhere. Kafka StringDecoder class has the constructor with VerifiableProperties parameter and all required classes are in the same uber jar, so it is strange that scala/java cannot find it with reflection api. Maybe there is some problem with Manifest/ClassTag usage in KafkaUtils or KafkaInputDStream classes, but I'm not a Scala expert and cannot be sure about it. The problematic code is the same from version 0.9 to the current one, so it's still there. Unit test from the Spark project is working fine with every KafkaUtils method, because the test does not try to register the kafka stream, only checks the interface. Currently it is possible to use Kafka to Spark Streaming pipeline from Java only with the default String message decoders, which makes this tool almost useless (unless you are a great JSON fan). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-in-KafkaReciever-tp2209p7347.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Information on Spark UI
The executors shown CANNOT FIND ADDRESS are not listed in the Executors Tab on the top of the Spark UI. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Information-on-Spark-UI-tp7354p7355.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
spark streaming, kafka, SPARK_CLASSPATH
I am using Spark 1.0.0 compiled with Hadoop 1.2.1. I have a toy spark-streaming-kafka program. It reads from a kafka queue and does stream .map {case (k, v) = (v, 1)} .reduceByKey(_ + _) .print() using a 1 second interval on the stream. The docs say to make Spark and Hadoop jars 'provided' but this breaks for spark-streaming. Including spark-streaming (and spark-streaming-kafka) as 'compile' to sweep them into our assembly gives collisions on javax.* classes. To work around this I modified $SPARK_HOME/bin/compute-classpath.sh to include spark-streaming, spark-streaming-kafka, and zkclient. (Note that kafka is included as 'compile' in my project and picked up in the assembly.) I have set up conf/spark-env.sh as needed. I have copied my assembly to /tmp/myjar.jar on all spark hosts and to my hdfs /tmp/jars directory. I am running spark-submit from my spark master. I am guided by the information here https://spark.apache.org/docs/latest/submitting-applications.html Well at this point I was going to detail all the ways spark-submit fails to follow it's own documentation. If I do not invoke sparkContext.setJars() then it just fails to find the driver class. This is using various combinations of absolute path, file:, hdfs: (Warning: Skip remote jar)??, and local: prefixes on the application-jar and --jars arguments. If I invoke sparkContext.setJars() and include my assembly jar I get further. At this point I get a failure from kafka.consumer.ConsumerConnector not being found. I suspect this is because spark-streaming-kafka needs the Kafka dependency it but my assembly jar is too late in the classpath. At this point I try setting spark.files.userClassPathfirst to 'true' but this causes more things to blow up. I finally found something that works. Namely setting environment variable SPARK_CLASSPATH=/tmp/myjar.jar But silly me, this is deprecated and I'm helpfully informed to Please instead use: - ./spark-submit with --driver-class-path to augment the driver classpath - spark.executor.extraClassPath to augment the executor classpath which when put into a file and introduced with --properties-file does not work. (Also tried spark.files.userClassPathFirst here.) These fail with the kafka.consumer.ConsumerConnector error. At a guess what's going on is that using SPARK_CLASSPATH I have my assembly jar in the classpath at SparkSubmit invocation Spark Command: java -cp /tmp/myjar.jar::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.0.0-hadoop1.2.1.jar:/opt/spark/lib/spark-streaming_2.10-1.0.0.jar:/opt/spark/lib/spark-streaming-kafka_2.10-1.0.0.jar:/opt/spark/lib/zkclient-0.4.jar -XX:MaxPermSize=128m -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --class me.KafkaStreamingWC /tmp/myjar.jar but using --properties-file then the assembly is not available for SparkSubmit. I think the root cause is either spark-submit not handling the spark-streaming libraries so they can be 'provided' or the inclusion of org.elicpse.jetty.orbit in the streaming libraries which cause [error] (*:assembly) deduplicate: different file contents found in the following: [error] /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.transaction/orbits/javax.transaction-1.1.1.v201105210645.jar:META-INF/ECLIPSEF.RSA [error] /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.servlet/orbits/javax.servlet-3.0.0.v201112011016.jar:META-INF/ECLIPSEF.RSA [error] /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.mail.glassfish/orbits/javax.mail.glassfish-1.4.1.v201005082020.jar:META-INF/ECLIPSEF.RSA [error] /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.activation/orbits/javax.activation-1.1.0.v201105071233.jar:META-INF/ECLIPSEF.RSA I've tried applying mergeStategy in assembly for my assembly.sbt but then I get Invalid signature file digest for Manifest main attributes If anyone knows the magic to get this working a reply would be greatly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-SPARK-CLASSPATH-tp7356.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
groupBy question
After doing a groupBy operation, I have the following result: val res = (ID1,ArrayBuffer((145804601,ID1,japan))) (ID3,ArrayBuffer((145865080,ID3,canada), (145899640,ID3,china))) (ID2,ArrayBuffer((145752760,ID2,usa), (145934200,ID2,usa))) Now I need to output for each group, the size of each group and the max of the first field, which is a timestamp. So, I tried the following: 1) res.map(group = (group._2.size, group._2._1.max)) But I got an error : value _1 is not a member of Iterable[(Long, String, String)] 2) I also tried: res.map(group = (group._2.size, group._2[1].max)), but got an error for that as well. What is the right way to get the max of the timestamp field (the first field in the ArrayBuffer) for each group? thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-question-tp7357.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Monitoring spark dis-associated workers
We're running into an issue where periodically the master loses connectivity with workers in the spark cluster. We believe this issue tends to manifest when the cluster is under heavy load, but we're not entirely sure when it happens. I've seen one or two other messages to this list about this issue, but no one seems to have a clue as to the actual bug. So, to work around the issue, we'd like to programmatically monitor the number of workers connected to the master and restart the cluster when the master loses track of some of its workers. Any ideas on how to programmatically write such a health check? Thanks, Allen -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Monitoring-spark-dis-associated-workers-tp7358.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
problem starting the history server on EC2
I created a Spark 1.0 cluster on EC2 using the provided scripts. However, I do not seem to be able to start the history server on the master node. I used the following command: ./start-history-server.sh /root/spark_log The error message says that the logging directory /root/spark_log does not exist. But I have definitely created the directory and made sure everyone can read/write/execute in the directory. Can you tell me why it does not work? Thank you Zhen -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/problem-starting-the-history-server-on-EC2-tp7361.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: problem starting the history server on EC2
What's the permission on /root itself? On Jun 10, 2014 6:29 PM, zhen z...@latrobe.edu.au wrote: I created a Spark 1.0 cluster on EC2 using the provided scripts. However, I do not seem to be able to start the history server on the master node. I used the following command: ./start-history-server.sh /root/spark_log The error message says that the logging directory /root/spark_log does not exist. But I have definitely created the directory and made sure everyone can read/write/execute in the directory. Can you tell me why it does not work? Thank you Zhen -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/problem-starting-the-history-server-on-EC2-tp7361.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
output tuples in CSV format
My output is a set of tuples and when I output it using saveAsTextFile, my file looks as follows: (field1_tup1, field2_tup1, field3_tup1,...) (field1_tup2, field2_tup2, field3_tup2,...) In Spark. is there some way I can simply have it output in CSV format as follows (i.e. without the parentheses): field1_tup1, field2_tup1, field3_tup1,... field1_tup2, field2_tup2, field3_tup2,... I could write a script to remove the parentheses, but would be easier if I could omit the parentheses. I did not find a saveAsCsvFile in Spark. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/output-tuples-in-CSV-format-tp7363.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Using Spark on Data size larger than Memory size
Thanks for the clarification. What is the proper way to configure RDDs when your aggregate data size exceeds your available working memory size? In particular, in additional to typical operations, I'm performing cogroups, joins, and coalesces/shuffles. I see that the default storage level for RDDs is MEMORY_ONLY. Do I just need to set all the storage level for all of my RDDs to something like MEMORY_AND_DISK? Do I need to do anything else to get graceful behavior in the presence of coalesces/shuffles, cogroups, and joins? Thanks, Allen -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-on-Data-size-larger-than-Memory-size-tp6589p7364.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Information on Spark UI
We are seeing this issue as well. We run on YARN and see logs about lost executor. Looks like some stages had to be re-run to compute RDD partitions lost in the executor. We were able to complete 20 iterations with 20% full matrix but not beyond that (total 100GB). On Tue, Jun 10, 2014 at 8:32 PM, coderxiang shuoxiang...@gmail.com wrote: The executors shown CANNOT FIND ADDRESS are not listed in the Executors Tab on the top of the Spark UI. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Information-on-Spark-UI-tp7354p7355.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: output tuples in CSV format
you can just use something like this: myRdd(_.productIterator.mkString(,)).saveAsTextFile On Tue, Jun 10, 2014 at 6:34 PM, SK skrishna...@gmail.com wrote: My output is a set of tuples and when I output it using saveAsTextFile, my file looks as follows: (field1_tup1, field2_tup1, field3_tup1,...) (field1_tup2, field2_tup2, field3_tup2,...) In Spark. is there some way I can simply have it output in CSV format as follows (i.e. without the parentheses): field1_tup1, field2_tup1, field3_tup1,... field1_tup2, field2_tup2, field3_tup2,... I could write a script to remove the parentheses, but would be easier if I could omit the parentheses. I did not find a saveAsCsvFile in Spark. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/output-tuples-in-CSV-format-tp7363.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: output tuples in CSV format
It would be better to add one more transformation step before saveAsTextFile, like: rdd.map(tuple = %s,%s,%s.format(tuple._1, tuple._2, tuple._3)).saveAsTextFile(...) By manually convert to the format you what, and then write to HDFS. Thanks Jerry -Original Message- From: SK [mailto:skrishna...@gmail.com] Sent: Wednesday, June 11, 2014 9:34 AM To: u...@spark.incubator.apache.org Subject: output tuples in CSV format My output is a set of tuples and when I output it using saveAsTextFile, my file looks as follows: (field1_tup1, field2_tup1, field3_tup1,...) (field1_tup2, field2_tup2, field3_tup2,...) In Spark. is there some way I can simply have it output in CSV format as follows (i.e. without the parentheses): field1_tup1, field2_tup1, field3_tup1,... field1_tup2, field2_tup2, field3_tup2,... I could write a script to remove the parentheses, but would be easier if I could omit the parentheses. I did not find a saveAsCsvFile in Spark. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/output-tuples-in-CSV-format-tp7363.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to process multiple classification with SVM in MLlib
Thanks. Now I know how to broadcast the dataset but I still wonder after broadcasting the dataset how can I apply my algorithm to training the model in the wokers. To describe my question in detail, The following code is used to train LDA(Latent Dirichlet Allocation) model with JGibbLDA in single machine, it iterate to sample the topic and train the model. After broadcasting the dataset, how can I keep the code running in Spark? Thank you. LDACmdOption ldaOption = new LDACmdOption(); //to set the parameters of LDA ldaOption.est = true; ldaOption.estc = false; ldaOption.modelName = model-final;//the name of the output file ldaOption.dir = /usr/Java; ldaOption.dfile = newDoc.dat//this is the input data file ldaOption.alpha = 0.5; ldaOption.beta = 0.1; ldaOption.K = 10;// the numbers of the topic ldaOption.niters = 1000;//the times of iteration topicNum = ldaOption.K; Estimator estimator = new Estimator(); estimator.init(ldaOption); estimator.estimate(); -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-process-multiple-classification-with-SVM-in-MLlib-tp7174p7368.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: problem starting the history server on EC2
I checked the permission on root and it is the following: drwxr-xr-x 20 root root 4096 Jun 11 01:05 root So anyway, I changed to use /tmp/spark_log instead and this time I made sure that all permissions are given to /tmp and /tmp/spark_log like below. But it still does not work: drwxrwxrwt 8 root root 4096 Jun 11 02:08 tmp drwxrwxrwx 2 root root 4096 Jun 11 02:08 spark_log Thanks Zhen -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/problem-starting-the-history-server-on-EC2-tp7361p7370.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: problem starting the history server on EC2
Can you try file:/root/spark_log? 2014-06-10 19:22 GMT-07:00 zhen z...@latrobe.edu.au: I checked the permission on root and it is the following: drwxr-xr-x 20 root root 4096 Jun 11 01:05 root So anyway, I changed to use /tmp/spark_log instead and this time I made sure that all permissions are given to /tmp and /tmp/spark_log like below. But it still does not work: drwxrwxrwt 8 root root 4096 Jun 11 02:08 tmp drwxrwxrwx 2 root root 4096 Jun 11 02:08 spark_log Thanks Zhen -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/problem-starting-the-history-server-on-EC2-tp7361p7370.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to process multiple classification with SVM in MLlib
Someone suggests me to use Mahout, but I'm not familiar with it. And in that case, using Mahout will add difficulties to my program. I'd like to run the algorithm in Spark. I'm a beginner, can you give me some suggestions? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-process-multiple-classification-with-SVM-in-MLlib-tp7174p7372.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Question about RDD cache, unpersist, materialization
Hi, What I (seems to) know about RDD persisting API is as follows: - cache() and persist() is not an action. It only does a marking. - unpersist() is also not an action. It only removes a marking. But if the rdd is already in memory, it is unloaded. And there seems no API to forcefully materialize the RDD without requiring a data by an action method, for example first(). So, I am faced with the following scenario. { JavaRDDT rddUnion = sc.parallelize(new ArrayListT()); // create empty for merging for (int i = 0; i 10; i++) { JavaRDDT2 rdd = sc.textFile(inputFileNames[i]); rdd.cache(); // Since it will be used twice, cache. rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]); // Transform and save, rdd materializes rddUnion = rddUnion.union(rdd.map(...).filter(...)); // Do another transform to T and merge by union rdd.unpersist(); // Now it seems not needed. (But needed actually) } // Here, rddUnion actually materializes, and needs all 10 rdds that already unpersisted. // So, rebuilding all 10 rdds will occur. rddUnion.saveAsTextFile(mergedFileName); } If rddUnion can be materialized before the rdd.unpersist() line and cache()d, the rdds in the loop will not be needed on rddUnion.saveAsTextFile(). Now what is the best strategy? - Do not unpersist all 10 rdds in the loop. - Materialize rddUnion in the loop by calling 'light' action API, like first(). - Give up and just rebuild/reload all 10 rdds when saving rddUnion. Is there some misunderstanding? Thanks.
Re: getting started with mllib.recommendation.ALS
Thanks Sean. I realized that I was supplying train() with a very low rank so I will retry with something higher and then play with lambda as-needed. On Tue, Jun 10, 2014 at 4:58 PM, Sean Owen so...@cloudera.com wrote: For trainImplicit(), the output is an approximation of a matrix of 0s and 1s, so the values are generally (not always) in [0,1] But for train(), you should be predicting the original input matrix as-is, as I understand. You should get output in about the same range as the input but again not necessarily 1-5. If it's really different, you could be underfitting. Try less lambda, more features? On Tue, Jun 10, 2014 at 4:59 PM, Sandeep Parikh sand...@clusterbeep.org wrote: Question on the input and output for ALS.train() and MatrixFactorizationModel.predict(). My input is list of Ratings(user_id, product_id, rating) and my ratings are one a scale of 1-5 (inclusive). When I compute predictions over the superset of all (user_id, product_id) pairs, the ratings produced are on a different scale. The question is this: do I need to normalize the data coming out of predict() to my own scale or does the input need to be different? Thanks!
RE: Question about RDD cache, unpersist, materialization
BTW, it is possible that rdd.first() does not compute the whole partitions. So, first() cannot be uses for the situation below. -Original Message- From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] Sent: Wednesday, June 11, 2014 11:40 AM To: user@spark.apache.org Subject: Question about RDD cache, unpersist, materialization Hi, What I (seems to) know about RDD persisting API is as follows: - cache() and persist() is not an action. It only does a marking. - unpersist() is also not an action. It only removes a marking. But if the rdd is already in memory, it is unloaded. And there seems no API to forcefully materialize the RDD without requiring a data by an action method, for example first(). So, I am faced with the following scenario. { JavaRDDT rddUnion = sc.parallelize(new ArrayListT()); // create empty for merging for (int i = 0; i 10; i++) { JavaRDDT2 rdd = sc.textFile(inputFileNames[i]); rdd.cache(); // Since it will be used twice, cache. rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]); // Transform and save, rdd materializes rddUnion = rddUnion.union(rdd.map(...).filter(...)); // Do another transform to T and merge by union rdd.unpersist(); // Now it seems not needed. (But needed actually) } // Here, rddUnion actually materializes, and needs all 10 rdds that already unpersisted. // So, rebuilding all 10 rdds will occur. rddUnion.saveAsTextFile(mergedFileName); } If rddUnion can be materialized before the rdd.unpersist() line and cache()d, the rdds in the loop will not be needed on rddUnion.saveAsTextFile(). Now what is the best strategy? - Do not unpersist all 10 rdds in the loop. - Materialize rddUnion in the loop by calling 'light' action API, like first(). - Give up and just rebuild/reload all 10 rdds when saving rddUnion. Is there some misunderstanding? Thanks.
Re: Problem in Spark Streaming
Have you considered the garbage collection impact and if it coincides with your latency spikes? You can enable gc logging by changing Spark configuration for your job. Hi, as I searched the keyword Total delay in the console log, the delay keeps increasing. I am not sure what does this total delay mean? For example, if I perform a windowing wordcount with windowSize=1ms and slidingStep=2000ms, then does the delay measured from the 10th second? A sample log is shown as follows: Total delay: 136.983 s for time 1402409331000 ms (execution: 1.711s) --what is execution time? Finished TID 490 in 14 ms on (progress: 1/6) --what is TID? and what is the progress? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-Spark-Streaming-tp7310p7329.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: problem starting the history server on EC2
No, I meant pass the path to the history server start script. 2014-06-10 19:33 GMT-07:00 zhen z...@latrobe.edu.au: Sure here it is: drwxrwxrwx 2 1000 root 4096 Jun 11 01:05 spark_logs Zhen -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/problem-starting-the-history-server-on-EC2-tp7361p7373.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to specify executor memory in EC2 ?
It might be that conf/spark-env.sh on EC2 is configured to set it to 512, and is overriding the application’s settings. Take a look in there and delete that line if possible. Matei On Jun 10, 2014, at 2:38 PM, Aliaksei Litouka aliaksei.lito...@gmail.com wrote: I am testing my application in EC2 cluster of m3.medium machines. By default, only 512 MB of memory on each machine is used. I want to increase this amount and I'm trying to do it by passing --executor-memory 2G option to the spark-submit script, but it doesn't seem to work - each machine uses only 512 MB instead of 2 gigabytes. What am I doing wrong? How do I increase the amount of memory?
Re: problem starting the history server on EC2
Yep, it gives tons of errors. I was able to make it work with sudo. Looks like ownership issue. Cheers k/ On Tue, Jun 10, 2014 at 6:29 PM, zhen z...@latrobe.edu.au wrote: I created a Spark 1.0 cluster on EC2 using the provided scripts. However, I do not seem to be able to start the history server on the master node. I used the following command: ./start-history-server.sh /root/spark_log The error message says that the logging directory /root/spark_log does not exist. But I have definitely created the directory and made sure everyone can read/write/execute in the directory. Can you tell me why it does not work? Thank you Zhen -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/problem-starting-the-history-server-on-EC2-tp7361.html Sent from the Apache Spark User List mailing list archive at Nabble.com.