Re: API to add/remove containers inside an application
Mailed our list - will send it to Spark Dev On Fri, Sep 5, 2014 at 11:28 AM, Rajat Gupta rgu...@qubole.com wrote: +1 on this. First step to more automated autoscaling of spark application master... On Fri, Sep 5, 2014 at 12:56 AM, Praveen Seluka psel...@qubole.com wrote: +user On Thu, Sep 4, 2014 at 10:53 PM, Praveen Seluka psel...@qubole.com wrote: Spark on Yarn has static allocation of resources. https://issues.apache.org/jira/browse/SPARK-3174 - This JIRA by Sandy is about adding and removing executors dynamically based on load. Even before doing this, can we expose and interface to add/remove executors. This will be very useful in Spark Shell. User can launch Spark Shell with just few executors. And later user wants more executors to be added, user could call some API like sc.addExecutor(count). We could similarly have removeExecutor API too. I have been thinking to work on this. Later, SPARK-3174 JIRA can add intelligence on when to add/remove depending on load. Right now, user will get control over adding and removing executor at runtime. Thoughts? Praveen
NotSerializableException: org.apache.spark.sql.hive.api.java.JavaHiveContext
Hello All, I am trying to query a Hive table using Spark SQL from my java code,but getting the following error: *Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.sql.hive.api.java.JavaHiveContextat org.apache.spark.scheduler.DAGScheduler.org http://org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)* I am using Spark 1.0.2. My code snippet is as below: *JavaHiveContext hiveContext = null;JavaSparkContext jsCtx = ..;hiveContext = new JavaHiveContext(jsCtx);hiveContext.hql(select col1,col2 from table1)* Usually people have been suggesting not to pass any non-serializable object to Spark closure function (map,reduce,etc.) to avoid it from getting distributed across multiple machines.But I am not using any closure functions here,so not sure how to handle this issue. Can you please advise how to resolve this problem? Thanks Bijoy
Re: EC2 - JNI crashes JVM with multi core instances
Hi, thanks for the reply. Actually, I think you are correct about the native library not being thread safe. However, for what I understand, different cores should start different processes, being these independent from one another. If I am not mistaken, the thread safe issue would start being critical when using threads. What is your opinion about this matter? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/EC2-JNI-crashes-JVM-with-multi-core-instances-tp13463p13527.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Serialize input path
How about: val range = Range.getRange.toString val notWorking = path/output_{ + range +}/*/* On Fri, Sep 5, 2014 at 3:45 AM, jerryye jerr...@gmail.com wrote: Hi, I have a quick serialization issue. I'm trying to read a date range of input files and I'm getting a serialization issue when using an input path that has a object generate a date range. Specifically, my code uses DateTimeFormat in the Joda time package, which is not serializable. How do I get spark to not lazily compute the input path and run into the serialization issue? Code: object Range { val now = new DateTime val dateFormatter = DateTimeFormat.forPattern(MMdd) def dateRange(from: DateTime, to: DateTime, step: Period): Iterator[DateTime] = Iterator.iterate(from)(_.plus(step)).takeWhile(!_.isAfter(to)) def getRange: String = { dateRange(now.minusDays(22), now, Period.days(1)).map(dateFormatter.print(_)).mkString(,) } } val notWorking = path/output_{ + Range.getRange +}/*/* val working = path/output_{08121914,08132014,08142014,08152014,08162014,08172014,08182014,08192014,08202014,08212014,08222014,08232014,08242014,08252014,08262014,08272014,08282014,08292014,08302014,08312014,09012014,09022014,09032014,09042014}/*/* val lines = sc.textFile(working).count - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Serialize input path
Thanks for the response Sean. As a correction. The code I provided actually ended up working. I tried to reduce my code down but I was being overzealous and running count actually works. The minimal code that triggers the problem is this: val userProfiles = lines.map(line = {parse(line)}).map(json = { val user = (json \ user) val given_name = (user \ given_name).extract[String] (user, given_name) }) this also doesn't work using what you suggested and still has the same serialization issue. It seems like a weird bug since doing lines.count works and even removing given_name in the closure works as well. :-/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serialize-input-path-tp13519p13530.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming into HBase
Hmmm, even i dont understand why TheMain needs to be serializable. It might be cleaner (as in avoid such mysterious closure issues) to actually create a separate sbt/maven project (instead of a shell) and run the streaming application from there. TD On Thu, Sep 4, 2014 at 10:25 AM, kpeng1 kpe...@gmail.com wrote: Tathagata, Thanks for all the help. It looks like the blah method doesn't need to be wrapped around a serializable object, but the main streaming calls do. I am currently running everything from spark-shell so I did not have a main function and object to wrap the streaming, map, and foreach calls. After wrapping those calls in an object and making that object Serializable, everything seems to be working. import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get} import org.apache.hadoop.hbase.util.Bytes object Blaher { def blah(row: Array[String]) { val hConf = new HBaseConfiguration() val hTable = new HTable(hConf, table) val thePut = new Put(Bytes.toBytes(row(0))) thePut.add(Bytes.toBytes(cf), Bytes.toBytes(row(0)), Bytes.toBytes(row(0))) hTable.put(thePut) } } object TheMain extends Serializable{ def run() { val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream(localhost, 9977, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.map(_.split(,)) val store = words.foreachRDD(rdd = rdd.foreach(Blaher.blah)) ssc.start() } } TheMain.run() Though, I don't really understand why TheMain object needs to be Serializable, but the Blaher object doesn't. On Wed, Sep 3, 2014 at 7:59 PM, Tathagata Das [via Apache Spark User List] [hidden email] http://user/SendEmail.jtp?type=nodenode=13478i=0 wrote: This is some issue with how Scala computes closures. Here because of the function blah it is trying the serialize the whole function that this code is part of. Can you define the function blah outside the main function? In fact you canTry putting the function in a serializable object. object BlahFunction extends Serializable { def blah(row: Array[Byte]) { } } On a related note, opening a connection for every record in the RDD is pretty inefficient. Use rdd.foreachPartition instead - open the connection, write the whole partition, and then close the conneciton. TD On Wed, Sep 3, 2014 at 4:24 PM, Kevin Peng [hidden email] http://user/SendEmail.jtp?type=nodenode=13406i=0 wrote: Ted, Here is the full stack trace coming from spark-shell: 14/09/03 16:21:03 ERROR scheduler.JobScheduler: Error running job streaming job 1409786463000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Basically, what I am doing on the terminal where I run nc -lk, I type in words separated by commas and hit enter i.e. bill,ted. On Wed, Sep 3, 2014 at 2:36 PM, Ted Yu [hidden email] http://user/SendEmail.jtp?type=nodenode=13406i=1 wrote:
PySpark on Yarn a lot of python scripts project
Hi , We avaluating PySpark and successfully executed examples of PySpark on Yarn. Next step what we want to do: We have a python project ( bunch of python script using Anaconda packages). Question: What is the way to execute PySpark on Yarn having a lot of python files ( ~ 50)? Should it be packaged in archive? How the command to execute Pyspark on Yarn with a lot of files will looks like? Currently command looks like: ./bin/spark-submit --master yarn --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 examples/src/main/python/wordcount.py 1000 Thanks Oleg.
Re: Multiple spark shell sessions
Hi Dhimant, We also cleaned up these needless warnings on port failover in Spark 1.1 -- see https://issues.apache.org/jira/browse/SPARK-1902 Andrew On Thu, Sep 4, 2014 at 7:38 AM, Dhimant dhimant84.jays...@gmail.com wrote: Thanks Yana, I am able to execute application and command via another session, i also received another port for UI application. Thanks, Dhimant -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-spark-shell-sessions-tp13441p13459.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Mapping Hadoop Reduce to Spark
Hi, FYI: There is bug in Java mapPartitions - SPARK-3369 https://issues.apache.org/jira/browse/SPARK-3369. In Java results from mapPartitions and similar functions must fit in memory. Look at example below - it returns List. Lukas On 1.9.2014 00:50, Matei Zaharia wrote: mapPartitions just gives you an Iterator of the values in each partition, and lets you return an Iterator of outputs. For instance, take a look at https://github.com/apache/spark/blob/master/core/src/test/java/org/apache/spark/JavaAPISuite.java#L694. Matei On August 31, 2014 at 12:26:51 PM, Steve Lewis (lordjoe2...@gmail.com mailto:lordjoe2...@gmail.com) wrote: Is there a sample of how to do this - I see 1.1 is out but cannot find samples of mapPartitions A Java sample would be very useful On Sat, Aug 30, 2014 at 10:30 AM, Matei Zaharia matei.zaha...@gmail.com mailto:matei.zaha...@gmail.com wrote: In 1.1, you'll be able to get all of these properties using sortByKey, and then mapPartitions on top to iterate through the key-value pairs. Unfortunately sortByKey does not let you control the Partitioner, but it's fairly easy to write your own version that does if this is important. In previous versions, the values for each key had to fit in memory (though we could have data on disk across keys), and this is still true for groupByKey, cogroup and join. Those restrictions will hopefully go away in a later release. But sortByKey + mapPartitions lets you just iterate through the key-value pairs without worrying about this. Matei On August 30, 2014 at 9:04:37 AM, Steve Lewis (lordjoe2...@gmail.com mailto:lordjoe2...@gmail.com) wrote: When programming in Hadoop it is possible to guarantee 1) All keys sent to a specific partition will be handled by the same machine (thread) 2) All keys received by a specific machine (thread) will be received in sorted order 3) These conditions will hold even if the values associated with a specific key are too large enough to fit in memory. In my Hadoop code I use all of these conditions - specifically with my larger data sets the size of data I wish to group exceeds the available memory. I think I understand the operation of groupby but my understanding is that this requires that the results for a single key, and perhaps all keys fit on a single machine. Is there away to perform like Hadoop ad not require that an entire group fir in memory? -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Re: Viewing web UI after fact
Hi Andrew, thank you very much for your answer. Unfortunately it still doesn't work. I'm using Spark 1.0.0, and I start history server running sbin/start-history-server.sh dir, although I also set SPARK_HISTORY_OPTS=-Dspark.history.fs.logDirectory in conf/spark-env.sh. I tried also other dir than /tmp/spark-events which have all possible permissions enabled. Also adding file: (and file://) didn't help - history server still shows: History Server Event Log Location: file:/tmp/spark-events/ No Completed Applications Found. Best regards, Grzegorz On Thu, Sep 4, 2014 at 8:20 PM, Andrew Or and...@databricks.com wrote: Hi Grzegorz, Sorry for the late response. Unfortunately, if the Master UI doesn't know about your applications (they are completed with respect to a different Master), then it can't regenerate the UIs even if the logs exist. You will have to use the history server for that. How did you start the history server? If you are using Spark =1.0, you can pass the directory as an argument to the sbin/start-history-server.sh script. Otherwise, you may need to set the following in your conf/spark-env.sh to specify the log directory: export SPARK_HISTORY_OPTS=-Dspark.history.fs.logDirectory=/tmp/spark-events It could also be a permissions thing. Make sure your logs in /tmp/spark-events are accessible by the JVM that runs the history server. Also, there's a chance that /tmp/spark-events is interpreted as an HDFS path depending on which Spark version you're running. To resolve any ambiguity, you may set the log path to file:/tmp/spark-events instead. But first verify whether they actually exist. Let me know if you get it working, -Andrew 2014-08-19 8:23 GMT-07:00 Grzegorz Białek grzegorz.bia...@codilime.com: Hi, Is there any way view history of applications statistics in master ui after restarting master server? I have all logs ing /tmp/spark-events/ but when I start history server in this directory it says No Completed Applications Found. Maybe I could copy this logs to dir used by master server but I couldn't find any. Or maybe I'm doing something wrong launching history server. Do you have any idea how to solve it? Thanks, Grzegorz On Thu, Aug 14, 2014 at 10:53 AM, Grzegorz Białek grzegorz.bia...@codilime.com wrote: Hi, Thank you both for your answers. Browsing using Master UI works fine. Unfortunately History Server shows No Completed Applications Found even if logs exists under given directory, but using Master UI is enough for me. Best regards, Grzegorz On Wed, Aug 13, 2014 at 8:09 PM, Andrew Or and...@databricks.com wrote: The Spark UI isn't available through the same address; otherwise new applications won't be able to bind to it. Once the old application finishes, the standalone Master renders the after-the-fact application UI and exposes it under a different URL. To see this, go to the Master UI (master-url:8080) and click on your application in the Completed Applications table. 2014-08-13 10:56 GMT-07:00 Matei Zaharia matei.zaha...@gmail.com: Take a look at http://spark.apache.org/docs/latest/monitoring.html -- you need to launch a history server to serve the logs. Matei On August 13, 2014 at 2:03:08 AM, grzegorz-bialek ( grzegorz.bia...@codilime.com) wrote: Hi, I wanted to access Spark web UI after application stops. I set spark.eventLog.enabled to true and logs are availaible in JSON format in /tmp/spark-event but web UI isn't available under address http://driver-node:4040 I'm running Spark in standalone mode. What should I do to access web UI after application ends? Thanks, Grzegorz -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Viewing-web-UI-after-fact-tp12023.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Recursion
Hi, Does Spark support recursive calls?
question on replicate() in blockManager.scala
Hi, var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: String, data: ByteBuffer, level: StorageLevel) { val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1) if (cachedPeers == null) { cachedPeers = master.getPeers(blockManagerId, level.replication - 1) } for (peer: BlockManagerId - cachedPeers) { val start = System.nanoTime data.rewind() logDebug(Try to replicate BlockId + blockId + once; The size of the data is + data.limit() + Bytes. To node: + peer) if (!BlockManagerWorker.syncPutBlock(PutBlock(blockId, data, tLevel), new ConnectionManagerId(peer.host, peer.port))) { logError(Failed to call syncPutBlock to + peer) } logDebug(Replicated BlockId + blockId + once used + (System.nanoTime - start) / 1e6 + s; The size of the data is + data.limit() + bytes.) } I get the flow of this code. But, I dont find any method being called for actually writing the data into the set of peers chosen for replication. Where exaclty is the replication happening? Thank you!! -Karthik
Spark that integrates with Kafka 0.7
Hi, Due to some limitations, we are having to stick to Kafka 0.7.x. We would like to use as latest a version of Spark in streaming mode that integrates with Kafka 0.7. The latest version supports only 0.8 it appears. Has anyone solved such a requirement ? Any tips on what can be tried ? FWIW, I tried use the low level of Kafka and write a custom receiver. This fails at compile time due to Scala dependency issues. The Scala version I have declared in pom.xml is 2.8.0 and this seems to not work with Spark Streaming version 1.0.2. Thanks Hemanth
Re: [GraphX] how to set memory configurations to avoid OutOfMemoryError GC overhead limit exceeded
Thank you, Ankur! :) But how to assign the storage level to a new vertices RDD that mapped from an existing vertices RDD, e.g. *val newVertexRDD = graph.collectNeighborIds(EdgeDirection.Out).map{case(id:VertexId, a:Array[VertexId]) = (id, initialHashMap(a))}* the new one will be combined with that existing edges RDD(MEMORY_AND_DISK) to construct a new graph. e.g. val newGraph = Graph(newVertexRDD, graph.edges) BTW, the return of newVertexRDD.getStorageLevel is StorageLevel(true, true, false, true, 1), what does it mean? Thanks in advance! Best, Yifan 2014-09-03 22:42 GMT+02:00 Ankur Dave ankurd...@gmail.com: At 2014-09-03 17:58:09 +0200, Yifan LI iamyifa...@gmail.com wrote: val graph = GraphLoader.edgeListFile(sc, edgesFile, minEdgePartitions = numPartitions).partitionBy(PartitionStrategy.EdgePartition2D).persist(StorageLevel.MEMORY_AND_DISK) Error: java.lang.UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level You have to pass the StorageLevel to GraphLoader.edgeListFile: val graph = GraphLoader.edgeListFile( sc, edgesFile, minEdgePartitions = numPartitions, edgeStorageLevel = StorageLevel.MEMORY_AND_DISK, vertexStorageLevel = StorageLevel.MEMORY_AND_DISK) .partitionBy(PartitionStrategy.EdgePartition2D) Ankur
Running spark-shell (or queries) over the network (not from master)
Is this possible? If i have a cluster set up on EC2 and I want to run spark-shell --master my master IP on EC2:7077 from my home computer - is this possible at all or am I wasting my time ;)? I am seeing a connection timeout when I try it. Thanks! Ognen - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
New sbt plugin to deploy jobs to EC2
As far as I know in other to deploy and execute jobs in EC2 you need to assembly you project, copy your jar into the cluster, log into using ssh and submit the job. To avoid having to do this I've been prototyping an sbt plugin(1) that allows to create and send Spark jobs to an Amazon EC2 cluster directly from your local machine using sbt. It's a simple plugin that actually rely on spark-ec2 and spark-submit, but I'd like to have feedback and see if this plugin makes any sense before going ahead with the final impl or if there is any other easy way to do so. (1) https://github.com/felixgborrego/sbt-spark-ec2-plugin Thanks,
Re: New sbt plugin to deploy jobs to EC2
\o/ = will test it soon or sooner, gr8 idea btw aℕdy ℙetrella about.me/noootsab [image: aℕdy ℙetrella on about.me] http://about.me/noootsab On Fri, Sep 5, 2014 at 12:37 PM, Felix Garcia Borrego fborr...@gilt.com wrote: As far as I know in other to deploy and execute jobs in EC2 you need to assembly you project, copy your jar into the cluster, log into using ssh and submit the job. To avoid having to do this I've been prototyping an sbt plugin(1) that allows to create and send Spark jobs to an Amazon EC2 cluster directly from your local machine using sbt. It's a simple plugin that actually rely on spark-ec2 and spark-submit, but I'd like to have feedback and see if this plugin makes any sense before going ahead with the final impl or if there is any other easy way to do so. (1) https://github.com/felixgborrego/sbt-spark-ec2-plugin Thanks,
spark 1.1.0 requested array size exceed vm limits
I am building spark-1.0-rc4 with maven,following http://spark.apache.org/docs/latest/building-with-maven.html But running graphx edgeFileList ,some tasks failed error:requested array size exceed vm limits error:executor lost Can anyone know how to fit it
RE: Web UI
Hello, Thanks for the explanation. So events are stored internally as JSON, but there is no official support for having Spark serve that JSON via HTTP? So if I wanted to write an app that monitors Spark, I would either have to scrape the web UI in HTML or rely on unofficial JSON features? That is quite surprising, because I would expect dumping out the JSON would be easier for Spark developers to implement than converting it to HTML. Do I get that right? Should I make a feature request? Thanks! Best, Oliver From: Andrew Or [mailto:and...@databricks.com] Sent: Thursday, September 04, 2014 2:11 PM To: Ruebenacker, Oliver A Cc: Akhil Das; Wonha Ryu; user@spark.apache.org Subject: Re: Web UI Hi all, The JSON version of the web UI is not officially supported; I don't believe this is documented anywhere. The alternative is to set `spark.eventLog.enabled` to true before running your application. This will create JSON SparkListenerEvents with details about each task and stage as a log file. Then you can easily reconstruct the web UI after the application has exited. This is what the standalone Master and the History Server does, actually. For local mode, you can use the latter to generate your UI after the fact. (This is documented here: http://spark.apache.org/docs/latest/monitoring.html). -Andrew 2014-09-04 5:28 GMT-07:00 Ruebenacker, Oliver A oliver.ruebenac...@altisource.commailto:oliver.ruebenac...@altisource.com: Hello, Thanks for the link – this is for standalone, though, and most URLs don’t work for local. I will look into deploying as standalone on a single node for testing and development. Best, Oliver From: Akhil Das [mailto:ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com] Sent: Thursday, September 04, 2014 3:09 AM To: Ruebenacker, Oliver A Cc: Wonha Ryu; user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Web UI Hi You can see this dochttps://spark.apache.org/docs/latest/spark-standalone.html#configuring-ports-for-network-security for all the available webUI ports. Yes there are ways to get the data metrics in Json format, One of them is below: http://webUI:8080/json/ Or simply curl webUI:8080/json/ There are some PRs about it you can read it over here https://github.com/apache/spark/pull/1682 Thanks Best Regards On Thu, Sep 4, 2014 at 2:24 AM, Ruebenacker, Oliver A oliver.ruebenac...@altisource.commailto:oliver.ruebenac...@altisource.com wrote: Hello, Interestingly, http://localhost:4040/metrics/json/ gives some numbers, but only a few which never seem to change during the application’s lifetime. Either the web UI has some very strange limitations, or there are some URLs yet to be discovered that do something interesting. Best, Oliver From: Wonha Ryu [mailto:wonha@gmail.commailto:wonha@gmail.com] Sent: Wednesday, September 03, 2014 4:27 PM To: Ruebenacker, Oliver A Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Web UI Hey Oliver, IIRC there's no JSON endpoint for application web UI. They only exist for cluster master and worker. - Wonha On Wed, Sep 3, 2014 at 12:58 PM, Ruebenacker, Oliver A oliver.ruebenac...@altisource.commailto:oliver.ruebenac...@altisource.com wrote: Hello, Thanks for the help! But I tried starting with “–master local[4]” and when I load http://localhost:4040/json I just get forwarded to http://localhost:4040/stages/, and it’s all human-readable HTML, no JSON. Best, Oliver From: Wonha Ryu [mailto:wonha@gmail.commailto:wonha@gmail.com] Sent: Wednesday, September 03, 2014 3:36 PM To: Ruebenacker, Oliver A Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Web UI Hi Oliver, Spark standalone master and worker support '/json' endpoint in web UI, which returns some of the information in JSON format. I wasn't able to find relevant documentation, though. - Wonha On Wed, Sep 3, 2014 at 12:12 PM, Ruebenacker, Oliver A oliver.ruebenac...@altisource.commailto:oliver.ruebenac...@altisource.com wrote: Hello, What is included in the Spark web UI? What are the available URLs? Can the information be obtained in a machine-readable way (e.g. JSON, XML, etc)? Thanks! Best, Oliver Oliver Ruebenacker | Solutions Architect Altisource™ 290 Congress St, 7th Floor | Boston, Massachusetts 02210 P: (617) 728-5582tel:%28617%29%20728-5582 | ext: 275585 oliver.ruebenac...@altisource.commailto:oliver.ruebenac...@altisource.com | www.Altisource.comhttp://www.Altisource.com *** This email message and any attachments are intended solely for the use of the addressee. If you are not the intended recipient, you are prohibited from reading, disclosing, reproducing, distributing, disseminating or otherwise using this transmission. If you have received this
Task not serializable
Hi, I'm trying to migrate a map-reduce program to work with spark. I migrated the program from Java to Scala. The map-reduce program basically loads a HDFS file and for each line in the file it applies several transformation functions available in various external libraries. When I execute this over spark, it is throwing me Task not serializable exceptions for each and every class being used from these from external libraries. I included serialization to few classes which are in my scope, but there there are several other classes which are out of my scope like org.apache.hadoop.io.Text. How to overcome these exceptions? ~Sarath.
Re: Task not serializable
You can bring those classes out of the library and Serialize it (implements Serializable). It is not the right way of doing it though it solved few of my similar problems. Thanks Best Regards On Fri, Sep 5, 2014 at 7:36 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi, I'm trying to migrate a map-reduce program to work with spark. I migrated the program from Java to Scala. The map-reduce program basically loads a HDFS file and for each line in the file it applies several transformation functions available in various external libraries. When I execute this over spark, it is throwing me Task not serializable exceptions for each and every class being used from these from external libraries. I included serialization to few classes which are in my scope, but there there are several other classes which are out of my scope like org.apache.hadoop.io.Text. How to overcome these exceptions? ~Sarath.
Re: Task not serializable
Hi Akhil, I've done this for the classes which are in my scope. But what to do with classes that are out of my scope? For example org.apache.hadoop.io.Text Also I'm using several 3rd part libraries like jeval. ~Sarath On Fri, Sep 5, 2014 at 7:40 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You can bring those classes out of the library and Serialize it (implements Serializable). It is not the right way of doing it though it solved few of my similar problems. Thanks Best Regards On Fri, Sep 5, 2014 at 7:36 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi, I'm trying to migrate a map-reduce program to work with spark. I migrated the program from Java to Scala. The map-reduce program basically loads a HDFS file and for each line in the file it applies several transformation functions available in various external libraries. When I execute this over spark, it is throwing me Task not serializable exceptions for each and every class being used from these from external libraries. I included serialization to few classes which are in my scope, but there there are several other classes which are out of my scope like org.apache.hadoop.io.Text. How to overcome these exceptions? ~Sarath.
Spark-cassandra-connector 1.0.0-rc5: java.io.NotSerializableException
Hi, My version of Spark is 1.0.2. I am trying to use Spark-cassandra-connector to execute an update csql statement inside an CassandraConnector(conf).withSessionDo block : CassandraConnector(conf).withSessionDo { session = { myRdd.foreach { case (ip, values) = session.execute( {An csql update statement}) } } } The above code is in the main method of the driver object. When I run the above code (in local mode), I get an exception : java.io.NotSerializableException: com.datastax.spark.connector.cql.SessionProxy Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: com.datastax.spark.connector.cql.SessionProxy at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:772) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:906) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$1.apply(DAGScheduler.scala:903) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$1.apply(DAGScheduler.scala:903) Is there a way to use an RDD inside an CassandraConnector(conf).withSessionDo block ? Thanks in advance for any assistance ! Shing - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: TimeStamp selection with SparkSQL
My approach may be partly influenced by my limited experience with SQL and Hive, but I just converted all my dates to seconds-since-epoch and then selected samples from specific time ranges using integer comparisons. On Thu, Sep 4, 2014 at 6:38 PM, Cheng, Hao hao.ch...@intel.com wrote: There are 2 SQL dialects, one is a very basic SQL support and another is Hive QL. In most of cases I think people prefer using the HQL, which also means you have to use HiveContext instead of the SQLContext. In this particular query you showed, seems datatime is the type Date, unfortunately, neither of those SQL dialect supports Date, but Timestamp. Cheng Hao *From:* Benjamin Zaitlen [mailto:quasi...@gmail.com] *Sent:* Friday, September 05, 2014 5:37 AM *To:* user@spark.apache.org *Subject:* TimeStamp selection with SparkSQL I may have missed this but is it possible to select on datetime in a SparkSQL query jan1 = sqlContext.sql(SELECT * FROM Stocks WHERE datetime = '2014-01-01') Additionally, is there a guide as to what SQL is valid? The guide says, Note that Spark SQL currently uses a very basic SQL parser It would be great to post what is currently supported. --Ben
replicated rdd storage problem
Hi, Whenever I replicate an rdd, I find that the rdd gets replicated only in one node. I have a 3 node cluster. I set rdd.persist(StorageLevel.MEMORY_ONLY_2) in my application. The webUI shows that its replicates twice. But, the rdd stogare details show that its replicated only once and only in one node. Can someone tell me where am I going wrong??? regards -Karthik
Re: PySpark on Yarn a lot of python scripts project
Hi Oleg, In order to simplify the process of package and distribute you codes, you could deploy an shared storage (such as NFS), and put your project in it, mount it to all the slaves as /projects. In the spark job scripts, you can access your project by put the path into sys.path, such as: import sys sys.path.append(/projects) import myproject Davies On Fri, Sep 5, 2014 at 1:28 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , We avaluating PySpark and successfully executed examples of PySpark on Yarn. Next step what we want to do: We have a python project ( bunch of python script using Anaconda packages). Question: What is the way to execute PySpark on Yarn having a lot of python files ( ~ 50)? Should it be packaged in archive? How the command to execute Pyspark on Yarn with a lot of files will looks like? Currently command looks like: ./bin/spark-submit --master yarn --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 examples/src/main/python/wordcount.py 1000 Thanks Oleg. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark that integrates with Kafka 0.7
After searching a little bit, I came to know that Spark 0.8 supports kafka-0.7. So, I tried to use it this way: In my pom.xml, specified a Spark dependency as follows: dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.9.3/artifactId version0.8.1-incubating/version /dependency And a kafka dependency as follows: dependency groupIdorg.apache.kafka/groupId artifactIdkafka/artifactId version0.7.2-spark/version scopeprovided/scope /dependency As I have declared the dependency as provided, I downloaded this the other files like SHA MD5 files from the cloudera repository at https://repository.cloudera.com/artifactory/repo/org/apache/kafka/kafka/0.7.2-spark/ into the maven local repository After building my jar file, I include the classes from this kafka jar into my jar file as suggested in this thread: http://qnalist.com/questions/5008317/spark-streaming-with-kafka-noclassdeffounderror I verified that the files actually exist using jar -tf However, when I submit my job, I am getting the following error (same as mentioned in the thread above): 14/09/05 21:45:58 INFO spark.SparkContext: Added JAR /Users/yhemanth/projects/personal/spark/spark-samples/target/spark-samples-1.0-SNAPSHOT-jar-with-dependencies.jar at http://192.168.1.5:51211/jars/spark-samples-1.0-SNAPSHOT-jar-with-dependencies.jar with timestamp 1409933758392 [WARNING] java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293) at java.lang.Thread.run(Thread.java:695) Caused by: java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder at org.apache.spark.streaming.StreamingContext.kafkaStream(StreamingContext.scala:258) at org.apache.spark.streaming.api.java.JavaStreamingContext.kafkaStream(JavaStreamingContext.scala:146) at com.yhemanth.spark.KafkaStreamingSample.main(KafkaStreamingSample.java:31) ... 6 more Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder at java.net.URLClassLoader$1.run(URLClassLoader.java:202) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) ... 9 more [WARNING] java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:1961) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1996) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399) at org.apache.spark.scheduler.SparkListenerBus$$anon$2.run(SparkListenerBus.scala:40) [WARNING] java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:1961) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2038) at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:424) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:496) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:157) 14/09/05 21:45:58 INFO network.ConnectionManager: Selector thread was interrupted! Can someone please help on how to debug this further, or am I doing something wrong ? Thanks hemanth On Fri, Sep 5, 2014 at 3:37 PM, Hemanth Yamijala yhema...@gmail.com wrote: Hi, Due to some limitations, we are having to stick to Kafka 0.7.x. We would like to use as latest a version of Spark in streaming mode that integrates with Kafka 0.7. The latest version supports only 0.8 it appears. Has anyone solved such a requirement ? Any tips on what can be tried ? FWIW, I tried use the low level of Kafka and write a custom receiver. This fails at compile time due to Scala dependency issues. The Scala version I have declared in pom.xml is 2.8.0 and this seems to not work with Spark Streaming version 1.0.2. Thanks Hemanth
Re: Spark that integrates with Kafka 0.7
After that long mail :-), I think I figured it out. I removed the 'provided' tag in my pom.xml and let the jars be directly included using maven's jar-with-dependencies plugin. Things started working after that. Thanks Hemanth On Fri, Sep 5, 2014 at 9:50 PM, Hemanth Yamijala yhema...@gmail.com wrote: After searching a little bit, I came to know that Spark 0.8 supports kafka-0.7. So, I tried to use it this way: In my pom.xml, specified a Spark dependency as follows: dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.9.3/artifactId version0.8.1-incubating/version /dependency And a kafka dependency as follows: dependency groupIdorg.apache.kafka/groupId artifactIdkafka/artifactId version0.7.2-spark/version scopeprovided/scope /dependency As I have declared the dependency as provided, I downloaded this the other files like SHA MD5 files from the cloudera repository at https://repository.cloudera.com/artifactory/repo/org/apache/kafka/kafka/0.7.2-spark/ into the maven local repository After building my jar file, I include the classes from this kafka jar into my jar file as suggested in this thread: http://qnalist.com/questions/5008317/spark-streaming-with-kafka-noclassdeffounderror I verified that the files actually exist using jar -tf However, when I submit my job, I am getting the following error (same as mentioned in the thread above): 14/09/05 21:45:58 INFO spark.SparkContext: Added JAR /Users/yhemanth/projects/personal/spark/spark-samples/target/spark-samples-1.0-SNAPSHOT-jar-with-dependencies.jar at http://192.168.1.5:51211/jars/spark-samples-1.0-SNAPSHOT-jar-with-dependencies.jar with timestamp 1409933758392 [WARNING] java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293) at java.lang.Thread.run(Thread.java:695) Caused by: java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder at org.apache.spark.streaming.StreamingContext.kafkaStream(StreamingContext.scala:258) at org.apache.spark.streaming.api.java.JavaStreamingContext.kafkaStream(JavaStreamingContext.scala:146) at com.yhemanth.spark.KafkaStreamingSample.main(KafkaStreamingSample.java:31) ... 6 more Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder at java.net.URLClassLoader$1.run(URLClassLoader.java:202) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) ... 9 more [WARNING] java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:1961) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1996) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399) at org.apache.spark.scheduler.SparkListenerBus$$anon$2.run(SparkListenerBus.scala:40) [WARNING] java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:1961) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2038) at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:424) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:496) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:157) 14/09/05 21:45:58 INFO network.ConnectionManager: Selector thread was interrupted! Can someone please help on how to debug this further, or am I doing something wrong ? Thanks hemanth On Fri, Sep 5, 2014 at 3:37 PM, Hemanth Yamijala yhema...@gmail.com wrote: Hi, Due to some limitations, we are having to stick to Kafka 0.7.x. We would like to use as latest a version of Spark in streaming mode that integrates with Kafka 0.7. The latest version supports only 0.8 it appears. Has anyone solved such a requirement ? Any tips on what can be tried ? FWIW, I tried use the low level of Kafka and write a custom receiver. This fails at compile time due to Scala dependency issues. The Scala version I have declared in pom.xml is 2.8.0 and this seems to not work with Spark Streaming version 1.0.2. Thanks Hemanth
Re: How to list all registered tables in a sql context?
Err... there's no such feature? Jianshi On Wed, Sep 3, 2014 at 7:03 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, How can I list all registered tables in a sql context? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: TimeStamp selection with SparkSQL
Preprocessing (after loading the data into HDFS). I started with data in JSON format in text files (stored in HDFS), and then loaded the data into parquet files with a bit of preprocessing and now I always retrieve the data by creating a SchemaRDD from the parquet file and using the SchemaRDD to back a table in a SQLContext. On Fri, Sep 5, 2014 at 9:53 AM, Benjamin Zaitlen quasi...@gmail.com wrote: Hi Brad, When you do the conversion is this a Hive/Spark job or is it a pre-processing step before loading into HDFS? ---Ben On Fri, Sep 5, 2014 at 10:29 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: My approach may be partly influenced by my limited experience with SQL and Hive, but I just converted all my dates to seconds-since-epoch and then selected samples from specific time ranges using integer comparisons. On Thu, Sep 4, 2014 at 6:38 PM, Cheng, Hao hao.ch...@intel.com wrote: There are 2 SQL dialects, one is a very basic SQL support and another is Hive QL. In most of cases I think people prefer using the HQL, which also means you have to use HiveContext instead of the SQLContext. In this particular query you showed, seems datatime is the type Date, unfortunately, neither of those SQL dialect supports Date, but Timestamp. Cheng Hao *From:* Benjamin Zaitlen [mailto:quasi...@gmail.com] *Sent:* Friday, September 05, 2014 5:37 AM *To:* user@spark.apache.org *Subject:* TimeStamp selection with SparkSQL I may have missed this but is it possible to select on datetime in a SparkSQL query jan1 = sqlContext.sql(SELECT * FROM Stocks WHERE datetime = '2014-01-01') Additionally, is there a guide as to what SQL is valid? The guide says, Note that Spark SQL currently uses a very basic SQL parser It would be great to post what is currently supported. --Ben
Re: PySpark on Yarn a lot of python scripts project
Hi: Curious... is there any reason not to use one of the below pyspark options (in red)? Assuming each file is, say 10k in size, is 50 files too much? Does that touch on some practical limitation? Usage: ./bin/pyspark [options] Options: --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local. --deploy-mode DEPLOY_MODE Where to run the driver program: either client to run on the local machine, or cluster to run inside cluster. --class CLASS_NAME Your application's main class (for Java / Scala apps). --name NAME A name of your application. --jars JARS Comma-separated list of local jars to include on the driver and executor classpaths. --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps. --files FILES Comma-separated list of files to be placed in the working directory of each executor. [ ... snip ... ] On 09/05/2014 12:00 PM, Davies Liu wrote: Hi Oleg, In order to simplify the process of package and distribute you codes, you could deploy an shared storage (such as NFS), and put your project in it, mount it to all the slaves as /projects. In the spark job scripts, you can access your project by put the path into sys.path, such as: import sys sys.path.append(/projects) import myproject Davies On Fri, Sep 5, 2014 at 1:28 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , We avaluating PySpark and successfully executed examples of PySpark on Yarn. Next step what we want to do: We have a python project ( bunch of python script using Anaconda packages). Question: What is the way to execute PySpark on Yarn having a lot of python files ( ~ 50)? Should it be packaged in archive? How the command to execute Pyspark on Yarn with a lot of files will looks like? Currently command looks like: ./bin/spark-submit --master yarn --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 examples/src/main/python/wordcount.py 1000 Thanks Oleg. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: error: type mismatch while Union
Which version are you using -- I can reproduce your issue w/ 0.9.2 but not with 1.0.1...so my guess is that it's a bug and the fix hasn't been backported... No idea on a workaround though.. On Fri, Sep 5, 2014 at 7:58 AM, Dhimant dhimant84.jays...@gmail.com wrote: Hi, I am getting type mismatch error while union operation. Can someone suggest solution ? / case class MyNumber(no: Int, secondVal: String) extends Serializable with Ordered[MyNumber] { override def toString(): String = this.no.toString + + this.secondVal override def compare(that: MyNumber): Int = this.no compare that.no override def compareTo(that: MyNumber): Int = this.no compare that.no def Equals(that: MyNumber): Boolean = { (this.no == that.no) (that match { case MyNumber(n1, n2) = n1 == no n2 == secondVal case _ = false }) } } val numbers = sc.parallelize(1 to 20, 10) val firstRdd = numbers.map(new MyNumber(_, A)) val secondRDD = numbers.map(new MyNumber(_, B)) val numberRdd = firstRdd .union(secondRDD ) console:24: error: type mismatch; found : org.apache.spark.rdd.RDD[MyNumber] required: org.apache.spark.rdd.RDD[MyNumber] val numberRdd = onenumberRdd.union(anotherRDD)/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/error-type-mismatch-while-Union-tp13547.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [Spark Streaming] Tracking/solving 'block input not found'
Hey Gerard, Spark Streaming should just queue the processing and not delete the block data. There are reports of this error and I am still unable to reproduce the problem. One workaround you can try the configuration spark.streaming.unpersist = false . This stops Spark Streaming from cleaning up old blocks. See the spark configuration page for more details. TD On Thu, Sep 4, 2014 at 6:33 AM, Gerard Maas gerard.m...@gmail.com wrote: Hello Sparkers, I'm currently running load tests on a Spark Streaming job. When the task duration increases beyond the batchDuration the job become unstable. In the logs I see tasks failed with the following message: Job aborted due to stage failure: Task 266.0:1 failed 4 times, most recent failure: Exception failure in TID 19929 on host dnode-0.hdfs.private: java.lang.Exception: Could not compute split, block input-2-140983593 not found org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) I understand it's not healthy that the task execution duration is longer than the batchDuration, but I guess we should be able to support peaks. I'm wondering whether this is this spark streaming 'graceful degradation' or is data being lost that that moment? What is the reason for the block lost and what is the recommended approach to deal with this? Thanks in advance, Gerard.
Re: PySpark on Yarn a lot of python scripts project
Ok , I didn't explain my self correct: In case of java having a lot of classes jar should be used. All examples for PySpark I found is one py script( Pi , wordcount ...) , but in real environment analytics has more then one py file. My question is how to use PySpark on Yarn analytics in case multiple python files. I a not so sure that using coma separated python files is a good option in my case ( we have quite a lot of files). In case of using zip option: Is it just a zip all python files like in jar in java? In java there is a Manifest file which points to the main method? Is the zip option best practice or there are other techniques? Thanks Oleg. On Sat, Sep 6, 2014 at 1:01 AM, Dimension Data, LLC. subscripti...@didata.us wrote: Hi: Curious... is there any reason not to use one of the below pyspark options (in red)? Assuming each file is, say 10k in size, is 50 files too much? Does that touch on some practical limitation? Usage: ./bin/pyspark [options] Options: --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local. --deploy-mode DEPLOY_MODE Where to run the driver program: either client to run on the local machine, or cluster to run inside cluster. --class CLASS_NAME Your application's main class (for Java / Scala apps). --name NAME A name of your application. --jars JARS Comma-separated list of local jars to include on the driver and executor classpaths. --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps. --files FILES Comma-separated list of files to be placed in the working directory of each executor. [ ... snip ... ] On 09/05/2014 12:00 PM, Davies Liu wrote: Hi Oleg, In order to simplify the process of package and distribute you codes, you could deploy an shared storage (such as NFS), and put your project in it, mount it to all the slaves as /projects. In the spark job scripts, you can access your project by put the path into sys.path, such as: import sys sys.path.append(/projects) import myproject Davies On Fri, Sep 5, 2014 at 1:28 AM, Oleg Ruchovets oruchov...@gmail.com oruchov...@gmail.com wrote: Hi , We avaluating PySpark and successfully executed examples of PySpark on Yarn. Next step what we want to do: We have a python project ( bunch of python script using Anaconda packages). Question: What is the way to execute PySpark on Yarn having a lot of python files ( ~ 50)? Should it be packaged in archive? How the command to execute Pyspark on Yarn with a lot of files will looks like? Currently command looks like: ./bin/spark-submit --master yarn --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 examples/src/main/python/wordcount.py 1000 Thanks Oleg. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark-streaming-kafka with broadcast variable
I need to use a broadcast variable inside the Decoder I use for class parameter T in org.apache.spark.streaming.kafka.KafkaUtils.createStream. I am using the override with this signature: createStreamhttps://spark.apache.org/docs/latest/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html#createStream(org.apache.spark.streaming.api.java.JavaStreamingContext,%20java.lang.Class,%20java.lang.Class,%20java.lang.Class,%20java.lang.Class,%20java.util.Map,%20java.util.Map,%20org.apache.spark.storage.StorageLevel)(JavaStreamingContexthttps://spark.apache.org/docs/latest/api/java/org/apache/spark/streaming/api/java/JavaStreamingContext.html jssc, ClassK keyTypeClass, ClassV valueTypeClass, ClassU keyDecoderClass, ClassT valueDecoderClass, java.util.MapString,String kafkaParams, java.util.MapString,Integer topics,StorageLevelhttps://spark.apache.org/docs/latest/api/java/org/apache/spark/storage/StorageLevel.html storageLevel) Anyone know how I might do that? The actual Decoder instance is instantiated by Spark, so I don’t know how to access a broadcast variable inside the fromBytes method. thanks Penny
Re: PySpark on Yarn a lot of python scripts project
Hi Oleg, We do support serving python files in zips. If you use --py-files, you can provide a comma delimited list of zips instead of python files. This will allow you to automatically add these files to the python path on the executors without you having to manually copy them to every single slave node. Andrew 2014-09-05 10:50 GMT-07:00 Davies Liu dav...@databricks.com: On Fri, Sep 5, 2014 at 10:21 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Ok , I didn't explain my self correct: In case of java having a lot of classes jar should be used. All examples for PySpark I found is one py script( Pi , wordcount ...) , but in real environment analytics has more then one py file. My question is how to use PySpark on Yarn analytics in case multiple python files. I a not so sure that using coma separated python files is a good option in my case ( we have quite a lot of files). In case of using zip option: Is it just a zip all python files like in jar in java? In java there is a Manifest file which points to the main method? Is the zip option best practice or there are other techniques? In daily development, it's common to modify your projects and re-run the jobs. If using zip or egg to package your code, you need to do this every time after modification, I think it will be boring. If the code is storaged and shared to the slaves via an shared file system, then it's pretty easy to modify and re-run your job, just like in local machine. Thanks Oleg. On Sat, Sep 6, 2014 at 1:01 AM, Dimension Data, LLC. subscripti...@didata.us wrote: Hi: Curious... is there any reason not to use one of the below pyspark options (in red)? Assuming each file is, say 10k in size, is 50 files too much? Does that touch on some practical limitation? Usage: ./bin/pyspark [options] Options: --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local. --deploy-mode DEPLOY_MODE Where to run the driver program: either client to run on the local machine, or cluster to run inside cluster. --class CLASS_NAME Your application's main class (for Java / Scala apps). --name NAME A name of your application. --jars JARS Comma-separated list of local jars to include on the driver and executor classpaths. --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps. --files FILES Comma-separated list of files to be placed in the working directory of each executor. [ ... snip ... ] On 09/05/2014 12:00 PM, Davies Liu wrote: Hi Oleg, In order to simplify the process of package and distribute you codes, you could deploy an shared storage (such as NFS), and put your project in it, mount it to all the slaves as /projects. In the spark job scripts, you can access your project by put the path into sys.path, such as: import sys sys.path.append(/projects) import myproject Davies On Fri, Sep 5, 2014 at 1:28 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , We avaluating PySpark and successfully executed examples of PySpark on Yarn. Next step what we want to do: We have a python project ( bunch of python script using Anaconda packages). Question: What is the way to execute PySpark on Yarn having a lot of python files ( ~ 50)? Should it be packaged in archive? How the command to execute Pyspark on Yarn with a lot of files will looks like? Currently command looks like: ./bin/spark-submit --master yarn --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 examples/src/main/python/wordcount.py 1000 Thanks Oleg. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PySpark on Yarn a lot of python scripts project
On Fri, Sep 5, 2014 at 10:50 AM, Davies Liu dav...@databricks.com wrote: In daily development, it's common to modify your projects and re-run the jobs. If using zip or egg to package your code, you need to do this every time after modification, I think it will be boring. That's why shell scripts were invented. :-) Probably a lot easier than setting up and maintaining shared storage in a large cluster. -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Viewing web UI after fact
Hi Grzegorz, Can you verify that there are APPLICATION_COMPLETE files in the event log directories? E.g. Does file:/tmp/spark-events/app-name-1234567890/APPLICATION_COMPLETE exist? If not, it could be that your application didn't call sc.stop(), so the ApplicationEnd event is not actually logged. The HistoryServer looks for this special file to identify applications to display. You could also try manually adding the APPLICATION_COMPLETE file to this directory; the HistoryServer should pick this up and display the application, though the information displayed will be incomplete because the log did not capture all the events (sc.stop() does a final close() on the file written). Andrew 2014-09-05 1:50 GMT-07:00 Grzegorz Białek grzegorz.bia...@codilime.com: Hi Andrew, thank you very much for your answer. Unfortunately it still doesn't work. I'm using Spark 1.0.0, and I start history server running sbin/start-history-server.sh dir, although I also set SPARK_HISTORY_OPTS=-Dspark.history.fs.logDirectory in conf/spark-env.sh. I tried also other dir than /tmp/spark-events which have all possible permissions enabled. Also adding file: (and file://) didn't help - history server still shows: History Server Event Log Location: file:/tmp/spark-events/ No Completed Applications Found. Best regards, Grzegorz On Thu, Sep 4, 2014 at 8:20 PM, Andrew Or and...@databricks.com wrote: Hi Grzegorz, Sorry for the late response. Unfortunately, if the Master UI doesn't know about your applications (they are completed with respect to a different Master), then it can't regenerate the UIs even if the logs exist. You will have to use the history server for that. How did you start the history server? If you are using Spark =1.0, you can pass the directory as an argument to the sbin/start-history-server.sh script. Otherwise, you may need to set the following in your conf/spark-env.sh to specify the log directory: export SPARK_HISTORY_OPTS=-Dspark.history.fs.logDirectory=/tmp/spark-events It could also be a permissions thing. Make sure your logs in /tmp/spark-events are accessible by the JVM that runs the history server. Also, there's a chance that /tmp/spark-events is interpreted as an HDFS path depending on which Spark version you're running. To resolve any ambiguity, you may set the log path to file:/tmp/spark-events instead. But first verify whether they actually exist. Let me know if you get it working, -Andrew 2014-08-19 8:23 GMT-07:00 Grzegorz Białek grzegorz.bia...@codilime.com: Hi, Is there any way view history of applications statistics in master ui after restarting master server? I have all logs ing /tmp/spark-events/ but when I start history server in this directory it says No Completed Applications Found. Maybe I could copy this logs to dir used by master server but I couldn't find any. Or maybe I'm doing something wrong launching history server. Do you have any idea how to solve it? Thanks, Grzegorz On Thu, Aug 14, 2014 at 10:53 AM, Grzegorz Białek grzegorz.bia...@codilime.com wrote: Hi, Thank you both for your answers. Browsing using Master UI works fine. Unfortunately History Server shows No Completed Applications Found even if logs exists under given directory, but using Master UI is enough for me. Best regards, Grzegorz On Wed, Aug 13, 2014 at 8:09 PM, Andrew Or and...@databricks.com wrote: The Spark UI isn't available through the same address; otherwise new applications won't be able to bind to it. Once the old application finishes, the standalone Master renders the after-the-fact application UI and exposes it under a different URL. To see this, go to the Master UI (master-url:8080) and click on your application in the Completed Applications table. 2014-08-13 10:56 GMT-07:00 Matei Zaharia matei.zaha...@gmail.com: Take a look at http://spark.apache.org/docs/latest/monitoring.html -- you need to launch a history server to serve the logs. Matei On August 13, 2014 at 2:03:08 AM, grzegorz-bialek ( grzegorz.bia...@codilime.com) wrote: Hi, I wanted to access Spark web UI after application stops. I set spark.eventLog.enabled to true and logs are availaible in JSON format in /tmp/spark-event but web UI isn't available under address http://driver-node:4040 I'm running Spark in standalone mode. What should I do to access web UI after application ends? Thanks, Grzegorz -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Viewing-web-UI-after-fact-tp12023.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Web UI
Sure, you can request it by filing an issue here: https://issues.apache.org/jira/browse/SPARK 2014-09-05 6:50 GMT-07:00 Ruebenacker, Oliver A oliver.ruebenac...@altisource.com: Hello, Thanks for the explanation. So events are stored internally as JSON, but there is no official support for having Spark serve that JSON via HTTP? So if I wanted to write an app that monitors Spark, I would either have to scrape the web UI in HTML or rely on unofficial JSON features? That is quite surprising, because I would expect dumping out the JSON would be easier for Spark developers to implement than converting it to HTML. Do I get that right? Should I make a feature request? Thanks! Best, Oliver *From:* Andrew Or [mailto:and...@databricks.com] *Sent:* Thursday, September 04, 2014 2:11 PM *To:* Ruebenacker, Oliver A *Cc:* Akhil Das; Wonha Ryu; user@spark.apache.org *Subject:* Re: Web UI Hi all, The JSON version of the web UI is not officially supported; I don't believe this is documented anywhere. The alternative is to set `spark.eventLog.enabled` to true before running your application. This will create JSON SparkListenerEvents with details about each task and stage as a log file. Then you can easily reconstruct the web UI after the application has exited. This is what the standalone Master and the History Server does, actually. For local mode, you can use the latter to generate your UI after the fact. (This is documented here: http://spark.apache.org/docs/latest/monitoring.html). -Andrew 2014-09-04 5:28 GMT-07:00 Ruebenacker, Oliver A oliver.ruebenac...@altisource.com: Hello, Thanks for the link – this is for standalone, though, and most URLs don’t work for local. I will look into deploying as standalone on a single node for testing and development. Best, Oliver *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Thursday, September 04, 2014 3:09 AM *To:* Ruebenacker, Oliver A *Cc:* Wonha Ryu; user@spark.apache.org *Subject:* Re: Web UI Hi You can see this doc https://spark.apache.org/docs/latest/spark-standalone.html#configuring-ports-for-network-security for all the available webUI ports. Yes there are ways to get the data metrics in Json format, One of them is below: ** *http://webUI:8080/json/ http://webUI:8080/json/* Or simply ** *curl webUI:8080/json/* There are some PRs about it you can read it over here https://github.com/apache/spark/pull/1682 Thanks Best Regards On Thu, Sep 4, 2014 at 2:24 AM, Ruebenacker, Oliver A oliver.ruebenac...@altisource.com wrote: Hello, Interestingly, http://localhost:4040/metrics/json/ gives some numbers, but only a few which never seem to change during the application’s lifetime. Either the web UI has some very strange limitations, or there are some URLs yet to be discovered that do something interesting. Best, Oliver *From:* Wonha Ryu [mailto:wonha@gmail.com] *Sent:* Wednesday, September 03, 2014 4:27 PM *To:* Ruebenacker, Oliver A *Cc:* user@spark.apache.org *Subject:* Re: Web UI Hey Oliver, IIRC there's no JSON endpoint for application web UI. They only exist for cluster master and worker. - Wonha On Wed, Sep 3, 2014 at 12:58 PM, Ruebenacker, Oliver A oliver.ruebenac...@altisource.com wrote: Hello, Thanks for the help! But I tried starting with “–master local[4]” and when I load http://localhost:4040/json I just get forwarded to http://localhost:4040/stages/, and it’s all human-readable HTML, no JSON. Best, Oliver *From:* Wonha Ryu [mailto:wonha@gmail.com] *Sent:* Wednesday, September 03, 2014 3:36 PM *To:* Ruebenacker, Oliver A *Cc:* user@spark.apache.org *Subject:* Re: Web UI Hi Oliver, Spark standalone master and worker support '/json' endpoint in web UI, which returns some of the information in JSON format. I wasn't able to find relevant documentation, though. - Wonha On Wed, Sep 3, 2014 at 12:12 PM, Ruebenacker, Oliver A oliver.ruebenac...@altisource.com wrote: Hello, What is included in the Spark web UI? What are the available URLs? Can the information be obtained in a machine-readable way (e.g. JSON, XML, etc)? Thanks! Best, Oliver Oliver Ruebenacker | Solutions Architect Altisource™ 290 Congress St, 7th Floor | Boston, Massachusetts 02210 P: (617) 728-5582 | ext: 275585 oliver.ruebenac...@altisource.com | www.Altisource.com *** This email message and any attachments are intended solely for the use of the addressee. If you are not the intended recipient, you are prohibited from reading, disclosing, reproducing, distributing, disseminating or otherwise using this transmission. If you have
Re: spark-streaming-kafka with broadcast variable
I am not sure if there is a good, clean way to do that - broadcasts variables are not designed to be used out side spark job closures. You could try a bit of a hacky stuff where you write the serialized variable to file in HDFS / NFS / distributed files sytem, and then use a custom decoder class that reads that files and uses it? An even more configurable version would be a custom decoder class that starts a RPC call to the driver running a RPC server to transfer the data. TD On Fri, Sep 5, 2014 at 10:36 AM, Penny Espinoza pesp...@societyconsulting.com wrote: I need to use a broadcast variable inside the Decoder I use for class parameter T in org.apache.spark.streaming.kafka.KafkaUtils.createStream. I am using the override with this signature: *createStream https://spark.apache.org/docs/latest/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html#createStream(org.apache.spark.streaming.api.java.JavaStreamingContext,%20java.lang.Class,%20java.lang.Class,%20java.lang.Class,%20java.lang.Class,%20java.util.Map,%20java.util.Map,%20org.apache.spark.storage.StorageLevel)* (JavaStreamingContext https://spark.apache.org/docs/latest/api/java/org/apache/spark/streaming/api/java/JavaStreamingContext.html jssc, ClassK keyTypeClass, ClassV valueTypeClass, ClassU keyDecoderClass, ClassT valueDecoderClass, java.util.MapString,String kafkaParams, java.util.MapString,Integer topics,StorageLevel https://spark.apache.org/docs/latest/api/java/org/apache/spark/storage/StorageLevel.html storageLevel) Anyone know how I might do that? The actual Decoder instance is instantiated by Spark, so I don’t know how to access a broadcast variable inside the fromBytes method. thanks Penny
Re: spark 1.1.0 requested array size exceed vm limits
At 2014-09-05 21:40:51 +0800, marylucy qaz163wsx_...@hotmail.com wrote: But running graphx edgeFileList ,some tasks failed error:requested array size exceed vm limits Try passing a higher value for minEdgePartitions when calling GraphLoader.edgeListFile. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to choose right DStream batch interval
repost since original msg was marked with This post has NOT been accepted by the mailing list yet. I have some questions regarding DStream batch interval: 1. if it only take 0.5 second to process the batch 99% of time, but 1% of batches need 5 seconds to process (due to some random factor or failures), then what's the right batch interval? 5 seconds (the worst case)? 2. What will happen to DStream processing if 1 batch took longer than batch interval? Can Spark recover from that? Thanks, Qihong -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-choose-right-DStream-batch-interval-tp13578p13579.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: pyspark on yarn hdp hortonworks
I'm running into a problem getting this working as well. I have spark-submit and spark-shell working fine, but pyspark in interactive mode can't seem to find the lzo jar: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found This is in /usr/lib/hadoop/lib/hadoop-lzo-0.6.0.jar which is in my SPARK_CLASSPATH environment variable, but that doesn't seem to be picked up by pyspark. Any ideas? I can't find much in the way of docs on getting the environment right for pyspark. Greg From: Andrew Or and...@databricks.commailto:and...@databricks.com Date: Wednesday, September 3, 2014 4:19 PM To: Oleg Ruchovets oruchov...@gmail.commailto:oruchov...@gmail.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: pyspark on yarn hdp hortonworks Hi Oleg, There isn't much you need to do to setup a Yarn cluster to run PySpark. You need to make sure all machines have python installed, and... that's about it. Your assembly jar will be shipped to all containers along with all the pyspark and py4j files needed. One caveat, however, is that the jar needs to be built in maven and not on a Red Hat-based OS, http://spark.apache.org/docs/latest/building-with-maven.html#building-for-pyspark-on-yarn In addition, it should be built with Java 6 because of a known issue with building jars with Java 7 and including python files in them (https://issues.apache.org/jira/browse/SPARK-1718). Lastly, if you have trouble getting it to work, you can follow the steps I have listed in a different thread to figure out what's wrong: http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3ccamjob8mr1+ias-sldz_rfrke_na2uubnmhrac4nukqyqnun...@mail.gmail.com%3e Let me know if you can get it working, -Andrew 2014-09-03 5:03 GMT-07:00 Oleg Ruchovets oruchov...@gmail.commailto:oruchov...@gmail.com: Hi all. I am trying to run pyspark on yarn already couple of days: http://hortonworks.com/kb/spark-1-0-1-technical-preview-hdp-2-1-3/ I posted exception on previous posts. It looks that I didn't do correct configuration. I googled quite a lot and I can't find the steps should be done to configure PySpark running on Yarn. Can you please share the steps (critical points) should be configured to use PaSpark on Yarn ( hortonworks distribution) : Environment variables. Classpath copy jars to all machine other configuration. Thanks Oleg.
Re: Shared variable in Spark Streaming
good question, soumitra. it's a bit confusing. to break TD's code down a bit: dstream.count() is a transformation operation (returns a new DStream), executes lazily, runs in the cluster on the underlying RDDs that come through in that batch, and returns a new DStream with a single element representing the count of the underlying RDDs in each batch. dstream.foreachRDD() is an output/action operation (returns something other than a DStream - nothing in this case), triggers the lazy execution above, returns the results to the driver, and increments the globalCount locally in the driver. per your specific question, RDD.count() is different in that it's an output/action operation that materializes the RDD and collects the count of elements in the RDD locally in the driver. confusing, indeed. accumulators updated in parallel on the worker nodes across the cluster and are read locally in the driver. On Fri, Aug 8, 2014 at 7:36 AM, Soumitra Kumar kumar.soumi...@gmail.com wrote: I want to keep track of the events processed in a batch. How come 'globalCount' work for DStream? I think similar construct won't work for RDD, that's why there is accumulator. On Fri, Aug 8, 2014 at 12:52 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Do you mean that you want a continuously updated count as more events/records are received in the DStream (remember, DStream is a continuous stream of data)? Assuming that is what you want, you can use a global counter var globalCount = 0L dstream.count().foreachRDD(rdd = { globalCount += rdd.first() } ) This globalCount variable will reside in the driver and will keep being updated after every batch. TD On Thu, Aug 7, 2014 at 10:16 PM, Soumitra Kumar kumar.soumi...@gmail.com wrote: Hello, I want to count the number of elements in the DStream, like RDD.count() . Since there is no such method in DStream, I thought of using DStream.count and use the accumulator. How do I do DStream.count() to count the number of elements in a DStream? How do I create a shared variable in Spark Streaming? -Soumitra.
Re: PySpark on Yarn a lot of python scripts project
Here is a store about how shared storage simplify all the things: In Douban, we use Moose FS[1] instead of HDFS as the distributed file system, it's POSIX compatible and can be mounted just as NFS. We put all the data and tools and code in it, so we can access them easily on all the machines, just like local disks. You can modify them in anywhere, and get the modified one from anywhere. One example, you will want to know the fields in your compressed log files: $ bunzip2 -k path | head Then you will need to modify your code to deal with these fields in logs: $ vim myjob.py you have bunch of libraries or modules in the projects, but you will not need to worry about them when run distributed jobs, you just need to do: $ python myjob.py If something wrong, you could modify myjob.py and save some RDDs into disks, then check the results: $ head path_to_result_of_rdd maybe something wrong is your library, then fix them, and run again: $ python myjob.py dump the result as CSV file, then load them into MySQL mysql xxx path_of_the_result In a summary, a shared storage can help a lot in distributed environment, some simple solution (such as NFS) is natural to solve these problem. setup once, benefit forever. PS: I'm also a contributor of Moose FS, has a fork at github.com/davies/moosefs/ PPS: I'm sorry for my pool English, if the above sounds rude to you, Davies [1] http://moosefs.org/ On Fri, Sep 5, 2014 at 11:22 AM, Dimension Data, LLC. subscripti...@didata.us wrote: I'd have to agree with Marcelo and Andrew here... Favoring a simple Build-and-Run/Submit wrapper-script that leverages '--py-files file.zip' over adding another layer of complexity -- even if seemingly 'trivial' like NFS -- is probably a good approach (... b/c more technology is never is 'trivial' over time). =:). Less is more. On 09/05/2014 01:58 PM, Marcelo Vanzin wrote: On Fri, Sep 5, 2014 at 10:50 AM, Davies Liu dav...@databricks.com wrote: In daily development, it's common to modify your projects and re-run the jobs. If using zip or egg to package your code, you need to do this every time after modification, I think it will be boring. That's why shell scripts were invented. :-) Probably a lot easier than setting up and maintaining shared storage in a large cluster. -- Sincerely yours, Team Dimension Data Dimension Data, LLC. | www.didata.us P: 212.882.1276 | subscripti...@didata.us Data Analytics you can literally count on. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PySpark on Yarn a lot of python scripts project
Hi Davies, On Fri, Sep 5, 2014 at 1:04 PM, Davies Liu dav...@databricks.com wrote: In Douban, we use Moose FS[1] instead of HDFS as the distributed file system, it's POSIX compatible and can be mounted just as NFS. Sure, if you already have the infrastructure in place, it might be worthwhile to use it. After all, it's already there and you have people to keep it working and set things up when you need to add new machines to your cluster. But what I think we're questioning is proposing that as a general solution when a 2-line shell script and an existing feature in spark-submit will most probably solve the problem. -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Repartition inefficient
I wonder if anyone has any tips for using repartition? It seems that when you call the repartition method, the entire RDD gets split up, shuffled, and redistributed... This is an extremely heavy task if you have a large hdfs dataset and all you want to do is make sure your RDD is balance/ data skew is minimal... I have tried coalesce(shuffle=false), but this seems to be somewhat ineffective at balancing the blocks. Care to share your experiences? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Repartition-inefficient-tp13587.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shared variable in Spark Streaming
Good explanation, Chris :) On Fri, Sep 5, 2014 at 12:42 PM, Chris Fregly ch...@fregly.com wrote: good question, soumitra. it's a bit confusing. to break TD's code down a bit: dstream.count() is a transformation operation (returns a new DStream), executes lazily, runs in the cluster on the underlying RDDs that come through in that batch, and returns a new DStream with a single element representing the count of the underlying RDDs in each batch. dstream.foreachRDD() is an output/action operation (returns something other than a DStream - nothing in this case), triggers the lazy execution above, returns the results to the driver, and increments the globalCount locally in the driver. per your specific question, RDD.count() is different in that it's an output/action operation that materializes the RDD and collects the count of elements in the RDD locally in the driver. confusing, indeed. accumulators updated in parallel on the worker nodes across the cluster and are read locally in the driver. On Fri, Aug 8, 2014 at 7:36 AM, Soumitra Kumar kumar.soumi...@gmail.com wrote: I want to keep track of the events processed in a batch. How come 'globalCount' work for DStream? I think similar construct won't work for RDD, that's why there is accumulator. On Fri, Aug 8, 2014 at 12:52 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Do you mean that you want a continuously updated count as more events/records are received in the DStream (remember, DStream is a continuous stream of data)? Assuming that is what you want, you can use a global counter var globalCount = 0L dstream.count().foreachRDD(rdd = { globalCount += rdd.first() } ) This globalCount variable will reside in the driver and will keep being updated after every batch. TD On Thu, Aug 7, 2014 at 10:16 PM, Soumitra Kumar kumar.soumi...@gmail.com wrote: Hello, I want to count the number of elements in the DStream, like RDD.count() . Since there is no such method in DStream, I thought of using DStream.count and use the accumulator. How do I do DStream.count() to count the number of elements in a DStream? How do I create a shared variable in Spark Streaming? -Soumitra.
Re: Running spark-shell (or queries) over the network (not from master)
I think that should be possible. Make sure spark is installed on your local machine and is the same version as on the cluster. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-spark-shell-or-queries-over-the-network-not-from-master-tp13543p13590.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running spark-shell (or queries) over the network (not from master)
On 9/5/2014 3:27 PM, anthonyjschu...@gmail.com wrote: I think that should be possible. Make sure spark is installed on your local machine and is the same version as on the cluster. It is the same version, I can telnet to master:7077 but when I run the spark-shell it times out. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Sparse Matrices support in Spark
YOu are right,but unfortunately the size of the matrices that i am trying to craft it will outperform the capacities of the machines that i have access to,that is why i need sparse libraries. i wish there is a good library for sparse matrices in java,i will try to check scala that you mentioned. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sparse-Matrices-support-in-Spark-tp13548p13592.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running spark-shell (or queries) over the network (not from master)
the command should be spark-shell --master spark://master ip on EC2:7077. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-spark-shell-or-queries-over-the-network-not-from-master-tp13543p13593.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running spark-shell (or queries) over the network (not from master)
That is the command I ran and it still times out.Besides 7077 is there any other port that needs to be open? Thanks! Ognen On 9/5/2014 4:10 PM, qihong wrote: the command should be spark-shell --master spark://master ip on EC2:7077. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-spark-shell-or-queries-over-the-network-not-from-master-tp13543p13593.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running spark-shell (or queries) over the network (not from master)
Since you are using your home computer, so it's probably not reachable by EC2 from internet. You can try to set spark.driver.host to your WAN ip, spark.driver.port to a fixed port in SparkConf, and open that port in your home network (port forwarding to the computer you are using). see if that helps. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-spark-shell-or-queries-over-the-network-not-from-master-tp13543p13595.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running spark-shell (or queries) over the network (not from master)
Ah. So there is some kind of a back and forth going on. Thanks! Ognen On 9/5/2014 5:34 PM, qihong wrote: Since you are using your home computer, so it's probably not reachable by EC2 from internet. You can try to set spark.driver.host to your WAN ip, spark.driver.port to a fixed port in SparkConf, and open that port in your home network (port forwarding to the computer you are using). see if that helps. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-spark-shell-or-queries-over-the-network-not-from-master-tp13543p13595.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
prepending jars to the driver class path for spark-submit on YARN
Hey - I’m struggling with some dependency issues with org.apache.httpcomponents httpcore and httpclient when using spark-submit with YARN running Spark 1.0.2 on a Hadoop 2.2 cluster. I’ve seen several posts about this issue, but no resolution. The error message is this: Caused by: java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.init(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V at org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOperator(PoolingClientConnectionManager.java:140) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:114) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:99) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:85) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:93) at com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(ConnectionManagerFactory.java:26) at com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:96) at com.amazonaws.http.AmazonHttpClient.init(AmazonHttpClient.java:155) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:118) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:102) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:332) at com.oncue.rna.realtime.streaming.config.package$.transferManager(package.scala:76) at com.oncue.rna.realtime.streaming.models.S3SchemaRegistry.init(SchemaRegistry.scala:27) at com.oncue.rna.realtime.streaming.models.S3SchemaRegistry$.schemaRegistry$lzycompute(SchemaRegistry.scala:46) at com.oncue.rna.realtime.streaming.models.S3SchemaRegistry$.schemaRegistry(SchemaRegistry.scala:44) at com.oncue.rna.realtime.streaming.coders.KafkaAvroDecoder.init(KafkaAvroDecoder.scala:20) ... 17 more The apache httpcomponents libraries include the method above as of version 4.2. The Spark 1.0.2 binaries seem to include version 4.1. I can get this to work in my driver program by adding exclusions to force use of 4.1, but then I get the error in tasks even when using the —jars option of the spark-submit command. How can I get both the driver program and the individual tasks in my spark-streaming job to use the same version of this library so my job will run all the way through? thanks p
Re: spark 1.1.0 requested array size exceed vm limits
I set 200,it remain failed in second step,(map and mapPartition in webui) In spark1.0.2 stable version ,it works well in first step,configuration same as 1.1.0 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Huge matrix
Hi Reza, Have you compared with the brute force algorithm for similarity computation with something like the following in Spark ? https://github.com/echen/scaldingale I am adding cosine similarity computation but I do want to compute an all pair similarities... Note that the data is sparse for me (the data that goes to matrix factorization) so I don't think joining and group-by on (product,product) will be a big issue for me... Does it make sense to add all pair similarities as well with dimsum based similarity ? Thanks. Deb On Fri, Apr 11, 2014 at 9:21 PM, Reza Zadeh r...@databricks.com wrote: Hi Xiaoli, There is a PR currently in progress to allow this, via the sampling scheme described in this paper: stanford.edu/~rezab/papers/dimsum.pdf The PR is at https://github.com/apache/spark/pull/336 though it will need refactoring given the recent changes to matrix interface in MLlib. You may implement the sampling scheme for your own app since it's much code. Best, Reza On Fri, Apr 11, 2014 at 9:17 PM, Xiaoli Li lixiaolima...@gmail.com wrote: Hi Andrew, Thanks for your suggestion. I have tried the method. I used 8 nodes and every node has 8G memory. The program just stopped at a stage for about several hours without any further information. Maybe I need to find out a more efficient way. On Fri, Apr 11, 2014 at 5:24 PM, Andrew Ash and...@andrewash.com wrote: The naive way would be to put all the users and their attributes into an RDD, then cartesian product that with itself. Run the similarity score on every pair (1M * 1M = 1T scores), map to (user, (score, otherUser)) and take the .top(k) for each user. I doubt that you'll be able to take this approach with the 1T pairs though, so it might be worth looking at the literature for recommender systems to see what else is out there. On Fri, Apr 11, 2014 at 9:54 PM, Xiaoli Li lixiaolima...@gmail.com wrote: Hi all, I am implementing an algorithm using Spark. I have one million users. I need to compute the similarity between each pair of users using some user's attributes. For each user, I need to get top k most similar users. What is the best way to implement this? Thanks.
Re: Huge matrix
Hi Deb, We are adding all-pairs and thresholded all-pairs via dimsum in this PR: https://github.com/apache/spark/pull/1778 Your question wasn't entirely clear - does this answer it? Best, Reza On Fri, Sep 5, 2014 at 6:14 PM, Debasish Das debasish.da...@gmail.com wrote: Hi Reza, Have you compared with the brute force algorithm for similarity computation with something like the following in Spark ? https://github.com/echen/scaldingale I am adding cosine similarity computation but I do want to compute an all pair similarities... Note that the data is sparse for me (the data that goes to matrix factorization) so I don't think joining and group-by on (product,product) will be a big issue for me... Does it make sense to add all pair similarities as well with dimsum based similarity ? Thanks. Deb On Fri, Apr 11, 2014 at 9:21 PM, Reza Zadeh r...@databricks.com wrote: Hi Xiaoli, There is a PR currently in progress to allow this, via the sampling scheme described in this paper: stanford.edu/~rezab/papers/dimsum.pdf The PR is at https://github.com/apache/spark/pull/336 though it will need refactoring given the recent changes to matrix interface in MLlib. You may implement the sampling scheme for your own app since it's much code. Best, Reza On Fri, Apr 11, 2014 at 9:17 PM, Xiaoli Li lixiaolima...@gmail.com wrote: Hi Andrew, Thanks for your suggestion. I have tried the method. I used 8 nodes and every node has 8G memory. The program just stopped at a stage for about several hours without any further information. Maybe I need to find out a more efficient way. On Fri, Apr 11, 2014 at 5:24 PM, Andrew Ash and...@andrewash.com wrote: The naive way would be to put all the users and their attributes into an RDD, then cartesian product that with itself. Run the similarity score on every pair (1M * 1M = 1T scores), map to (user, (score, otherUser)) and take the .top(k) for each user. I doubt that you'll be able to take this approach with the 1T pairs though, so it might be worth looking at the literature for recommender systems to see what else is out there. On Fri, Apr 11, 2014 at 9:54 PM, Xiaoli Li lixiaolima...@gmail.com wrote: Hi all, I am implementing an algorithm using Spark. I have one million users. I need to compute the similarity between each pair of users using some user's attributes. For each user, I need to get top k most similar users. What is the best way to implement this? Thanks.
Re: Huge matrix
Ohh coolall-pairs brute force is also part of this PR ? Let me pull it in and test on our dataset... Thanks. Deb On Fri, Sep 5, 2014 at 7:40 PM, Reza Zadeh r...@databricks.com wrote: Hi Deb, We are adding all-pairs and thresholded all-pairs via dimsum in this PR: https://github.com/apache/spark/pull/1778 Your question wasn't entirely clear - does this answer it? Best, Reza On Fri, Sep 5, 2014 at 6:14 PM, Debasish Das debasish.da...@gmail.com wrote: Hi Reza, Have you compared with the brute force algorithm for similarity computation with something like the following in Spark ? https://github.com/echen/scaldingale I am adding cosine similarity computation but I do want to compute an all pair similarities... Note that the data is sparse for me (the data that goes to matrix factorization) so I don't think joining and group-by on (product,product) will be a big issue for me... Does it make sense to add all pair similarities as well with dimsum based similarity ? Thanks. Deb On Fri, Apr 11, 2014 at 9:21 PM, Reza Zadeh r...@databricks.com wrote: Hi Xiaoli, There is a PR currently in progress to allow this, via the sampling scheme described in this paper: stanford.edu/~rezab/papers/dimsum.pdf The PR is at https://github.com/apache/spark/pull/336 though it will need refactoring given the recent changes to matrix interface in MLlib. You may implement the sampling scheme for your own app since it's much code. Best, Reza On Fri, Apr 11, 2014 at 9:17 PM, Xiaoli Li lixiaolima...@gmail.com wrote: Hi Andrew, Thanks for your suggestion. I have tried the method. I used 8 nodes and every node has 8G memory. The program just stopped at a stage for about several hours without any further information. Maybe I need to find out a more efficient way. On Fri, Apr 11, 2014 at 5:24 PM, Andrew Ash and...@andrewash.com wrote: The naive way would be to put all the users and their attributes into an RDD, then cartesian product that with itself. Run the similarity score on every pair (1M * 1M = 1T scores), map to (user, (score, otherUser)) and take the .top(k) for each user. I doubt that you'll be able to take this approach with the 1T pairs though, so it might be worth looking at the literature for recommender systems to see what else is out there. On Fri, Apr 11, 2014 at 9:54 PM, Xiaoli Li lixiaolima...@gmail.com wrote: Hi all, I am implementing an algorithm using Spark. I have one million users. I need to compute the similarity between each pair of users using some user's attributes. For each user, I need to get top k most similar users. What is the best way to implement this? Thanks.
Re: Huge matrix
You might want to wait until Wednesday since the interface will be changing in that PR before Wednesday, probably over the weekend, so that you don't have to redo your code. Your call if you need it before a week. Reza On Fri, Sep 5, 2014 at 7:43 PM, Debasish Das debasish.da...@gmail.com wrote: Ohh coolall-pairs brute force is also part of this PR ? Let me pull it in and test on our dataset... Thanks. Deb On Fri, Sep 5, 2014 at 7:40 PM, Reza Zadeh r...@databricks.com wrote: Hi Deb, We are adding all-pairs and thresholded all-pairs via dimsum in this PR: https://github.com/apache/spark/pull/1778 Your question wasn't entirely clear - does this answer it? Best, Reza On Fri, Sep 5, 2014 at 6:14 PM, Debasish Das debasish.da...@gmail.com wrote: Hi Reza, Have you compared with the brute force algorithm for similarity computation with something like the following in Spark ? https://github.com/echen/scaldingale I am adding cosine similarity computation but I do want to compute an all pair similarities... Note that the data is sparse for me (the data that goes to matrix factorization) so I don't think joining and group-by on (product,product) will be a big issue for me... Does it make sense to add all pair similarities as well with dimsum based similarity ? Thanks. Deb On Fri, Apr 11, 2014 at 9:21 PM, Reza Zadeh r...@databricks.com wrote: Hi Xiaoli, There is a PR currently in progress to allow this, via the sampling scheme described in this paper: stanford.edu/~rezab/papers/dimsum.pdf The PR is at https://github.com/apache/spark/pull/336 though it will need refactoring given the recent changes to matrix interface in MLlib. You may implement the sampling scheme for your own app since it's much code. Best, Reza On Fri, Apr 11, 2014 at 9:17 PM, Xiaoli Li lixiaolima...@gmail.com wrote: Hi Andrew, Thanks for your suggestion. I have tried the method. I used 8 nodes and every node has 8G memory. The program just stopped at a stage for about several hours without any further information. Maybe I need to find out a more efficient way. On Fri, Apr 11, 2014 at 5:24 PM, Andrew Ash and...@andrewash.com wrote: The naive way would be to put all the users and their attributes into an RDD, then cartesian product that with itself. Run the similarity score on every pair (1M * 1M = 1T scores), map to (user, (score, otherUser)) and take the .top(k) for each user. I doubt that you'll be able to take this approach with the 1T pairs though, so it might be worth looking at the literature for recommender systems to see what else is out there. On Fri, Apr 11, 2014 at 9:54 PM, Xiaoli Li lixiaolima...@gmail.com wrote: Hi all, I am implementing an algorithm using Spark. I have one million users. I need to compute the similarity between each pair of users using some user's attributes. For each user, I need to get top k most similar users. What is the best way to implement this? Thanks.
Re: Huge matrix
Ok...just to make sure I have RowMatrix[SparseVector] where rows are ~ 60M and columns are 10M say with billion data points... I have another version that's around 60M and ~ 10K... I guess for the second one both all pair and dimsum will run fine... But for tall and wide, what do you suggest ? can dimsum handle it ? I might need jaccard as well...can I plug that in the PR ? On Fri, Sep 5, 2014 at 7:48 PM, Reza Zadeh r...@databricks.com wrote: You might want to wait until Wednesday since the interface will be changing in that PR before Wednesday, probably over the weekend, so that you don't have to redo your code. Your call if you need it before a week. Reza On Fri, Sep 5, 2014 at 7:43 PM, Debasish Das debasish.da...@gmail.com wrote: Ohh coolall-pairs brute force is also part of this PR ? Let me pull it in and test on our dataset... Thanks. Deb On Fri, Sep 5, 2014 at 7:40 PM, Reza Zadeh r...@databricks.com wrote: Hi Deb, We are adding all-pairs and thresholded all-pairs via dimsum in this PR: https://github.com/apache/spark/pull/1778 Your question wasn't entirely clear - does this answer it? Best, Reza On Fri, Sep 5, 2014 at 6:14 PM, Debasish Das debasish.da...@gmail.com wrote: Hi Reza, Have you compared with the brute force algorithm for similarity computation with something like the following in Spark ? https://github.com/echen/scaldingale I am adding cosine similarity computation but I do want to compute an all pair similarities... Note that the data is sparse for me (the data that goes to matrix factorization) so I don't think joining and group-by on (product,product) will be a big issue for me... Does it make sense to add all pair similarities as well with dimsum based similarity ? Thanks. Deb On Fri, Apr 11, 2014 at 9:21 PM, Reza Zadeh r...@databricks.com wrote: Hi Xiaoli, There is a PR currently in progress to allow this, via the sampling scheme described in this paper: stanford.edu/~rezab/papers/dimsum.pdf The PR is at https://github.com/apache/spark/pull/336 though it will need refactoring given the recent changes to matrix interface in MLlib. You may implement the sampling scheme for your own app since it's much code. Best, Reza On Fri, Apr 11, 2014 at 9:17 PM, Xiaoli Li lixiaolima...@gmail.com wrote: Hi Andrew, Thanks for your suggestion. I have tried the method. I used 8 nodes and every node has 8G memory. The program just stopped at a stage for about several hours without any further information. Maybe I need to find out a more efficient way. On Fri, Apr 11, 2014 at 5:24 PM, Andrew Ash and...@andrewash.com wrote: The naive way would be to put all the users and their attributes into an RDD, then cartesian product that with itself. Run the similarity score on every pair (1M * 1M = 1T scores), map to (user, (score, otherUser)) and take the .top(k) for each user. I doubt that you'll be able to take this approach with the 1T pairs though, so it might be worth looking at the literature for recommender systems to see what else is out there. On Fri, Apr 11, 2014 at 9:54 PM, Xiaoli Li lixiaolima...@gmail.com wrote: Hi all, I am implementing an algorithm using Spark. I have one million users. I need to compute the similarity between each pair of users using some user's attributes. For each user, I need to get top k most similar users. What is the best way to implement this? Thanks.
Re: Huge matrix
Also for tall and wide (rows ~60M, columns 10M), I am considering running a matrix factorization to reduce the dimension to say ~60M x 50 and then run all pair similarity... Did you also try similar ideas and saw positive results ? On Fri, Sep 5, 2014 at 7:54 PM, Debasish Das debasish.da...@gmail.com wrote: Ok...just to make sure I have RowMatrix[SparseVector] where rows are ~ 60M and columns are 10M say with billion data points... I have another version that's around 60M and ~ 10K... I guess for the second one both all pair and dimsum will run fine... But for tall and wide, what do you suggest ? can dimsum handle it ? I might need jaccard as well...can I plug that in the PR ? On Fri, Sep 5, 2014 at 7:48 PM, Reza Zadeh r...@databricks.com wrote: You might want to wait until Wednesday since the interface will be changing in that PR before Wednesday, probably over the weekend, so that you don't have to redo your code. Your call if you need it before a week. Reza On Fri, Sep 5, 2014 at 7:43 PM, Debasish Das debasish.da...@gmail.com wrote: Ohh coolall-pairs brute force is also part of this PR ? Let me pull it in and test on our dataset... Thanks. Deb On Fri, Sep 5, 2014 at 7:40 PM, Reza Zadeh r...@databricks.com wrote: Hi Deb, We are adding all-pairs and thresholded all-pairs via dimsum in this PR: https://github.com/apache/spark/pull/1778 Your question wasn't entirely clear - does this answer it? Best, Reza On Fri, Sep 5, 2014 at 6:14 PM, Debasish Das debasish.da...@gmail.com wrote: Hi Reza, Have you compared with the brute force algorithm for similarity computation with something like the following in Spark ? https://github.com/echen/scaldingale I am adding cosine similarity computation but I do want to compute an all pair similarities... Note that the data is sparse for me (the data that goes to matrix factorization) so I don't think joining and group-by on (product,product) will be a big issue for me... Does it make sense to add all pair similarities as well with dimsum based similarity ? Thanks. Deb On Fri, Apr 11, 2014 at 9:21 PM, Reza Zadeh r...@databricks.com wrote: Hi Xiaoli, There is a PR currently in progress to allow this, via the sampling scheme described in this paper: stanford.edu/~rezab/papers/dimsum.pdf The PR is at https://github.com/apache/spark/pull/336 though it will need refactoring given the recent changes to matrix interface in MLlib. You may implement the sampling scheme for your own app since it's much code. Best, Reza On Fri, Apr 11, 2014 at 9:17 PM, Xiaoli Li lixiaolima...@gmail.com wrote: Hi Andrew, Thanks for your suggestion. I have tried the method. I used 8 nodes and every node has 8G memory. The program just stopped at a stage for about several hours without any further information. Maybe I need to find out a more efficient way. On Fri, Apr 11, 2014 at 5:24 PM, Andrew Ash and...@andrewash.com wrote: The naive way would be to put all the users and their attributes into an RDD, then cartesian product that with itself. Run the similarity score on every pair (1M * 1M = 1T scores), map to (user, (score, otherUser)) and take the .top(k) for each user. I doubt that you'll be able to take this approach with the 1T pairs though, so it might be worth looking at the literature for recommender systems to see what else is out there. On Fri, Apr 11, 2014 at 9:54 PM, Xiaoli Li lixiaolima...@gmail.com wrote: Hi all, I am implementing an algorithm using Spark. I have one million users. I need to compute the similarity between each pair of users using some user's attributes. For each user, I need to get top k most similar users. What is the best way to implement this? Thanks.
Re: Huge matrix
For 60M x 10K brute force and dimsum thresholding should be fine. For 60M x 10M probably brute force won't work depending on the cluster's power, and dimsum thresholding should work with appropriate threshold. Dimensionality reduction should help, and how effective it is will depend on your application and domain, it's worth trying if the direct computation doesn't work. You can also try running KMeans clustering (perhaps after dimensionality reduction) if your goal is to find batches of similar points instead of all pairs above a threshold. On Fri, Sep 5, 2014 at 8:02 PM, Debasish Das debasish.da...@gmail.com wrote: Also for tall and wide (rows ~60M, columns 10M), I am considering running a matrix factorization to reduce the dimension to say ~60M x 50 and then run all pair similarity... Did you also try similar ideas and saw positive results ? On Fri, Sep 5, 2014 at 7:54 PM, Debasish Das debasish.da...@gmail.com wrote: Ok...just to make sure I have RowMatrix[SparseVector] where rows are ~ 60M and columns are 10M say with billion data points... I have another version that's around 60M and ~ 10K... I guess for the second one both all pair and dimsum will run fine... But for tall and wide, what do you suggest ? can dimsum handle it ? I might need jaccard as well...can I plug that in the PR ? On Fri, Sep 5, 2014 at 7:48 PM, Reza Zadeh r...@databricks.com wrote: You might want to wait until Wednesday since the interface will be changing in that PR before Wednesday, probably over the weekend, so that you don't have to redo your code. Your call if you need it before a week. Reza On Fri, Sep 5, 2014 at 7:43 PM, Debasish Das debasish.da...@gmail.com wrote: Ohh coolall-pairs brute force is also part of this PR ? Let me pull it in and test on our dataset... Thanks. Deb On Fri, Sep 5, 2014 at 7:40 PM, Reza Zadeh r...@databricks.com wrote: Hi Deb, We are adding all-pairs and thresholded all-pairs via dimsum in this PR: https://github.com/apache/spark/pull/1778 Your question wasn't entirely clear - does this answer it? Best, Reza On Fri, Sep 5, 2014 at 6:14 PM, Debasish Das debasish.da...@gmail.com wrote: Hi Reza, Have you compared with the brute force algorithm for similarity computation with something like the following in Spark ? https://github.com/echen/scaldingale I am adding cosine similarity computation but I do want to compute an all pair similarities... Note that the data is sparse for me (the data that goes to matrix factorization) so I don't think joining and group-by on (product,product) will be a big issue for me... Does it make sense to add all pair similarities as well with dimsum based similarity ? Thanks. Deb On Fri, Apr 11, 2014 at 9:21 PM, Reza Zadeh r...@databricks.com wrote: Hi Xiaoli, There is a PR currently in progress to allow this, via the sampling scheme described in this paper: stanford.edu/~rezab/papers/dimsum.pdf The PR is at https://github.com/apache/spark/pull/336 though it will need refactoring given the recent changes to matrix interface in MLlib. You may implement the sampling scheme for your own app since it's much code. Best, Reza On Fri, Apr 11, 2014 at 9:17 PM, Xiaoli Li lixiaolima...@gmail.com wrote: Hi Andrew, Thanks for your suggestion. I have tried the method. I used 8 nodes and every node has 8G memory. The program just stopped at a stage for about several hours without any further information. Maybe I need to find out a more efficient way. On Fri, Apr 11, 2014 at 5:24 PM, Andrew Ash and...@andrewash.com wrote: The naive way would be to put all the users and their attributes into an RDD, then cartesian product that with itself. Run the similarity score on every pair (1M * 1M = 1T scores), map to (user, (score, otherUser)) and take the .top(k) for each user. I doubt that you'll be able to take this approach with the 1T pairs though, so it might be worth looking at the literature for recommender systems to see what else is out there. On Fri, Apr 11, 2014 at 9:54 PM, Xiaoli Li lixiaolima...@gmail.com wrote: Hi all, I am implementing an algorithm using Spark. I have one million users. I need to compute the similarity between each pair of users using some user's attributes. For each user, I need to get top k most similar users. What is the best way to implement this? Thanks.
Re: Huge matrix
I looked at the code: similarColumns(Double.posInf) is generating the brute force... Basically dimsum with gamma as PositiveInfinity will produce the exact same result as doing catesian products of RDD[(product, vector)] and computing similarities or there will be some approximation ? Sorry I have not read your paper yet. Will read it over the weekend. On Fri, Sep 5, 2014 at 8:13 PM, Reza Zadeh r...@databricks.com wrote: For 60M x 10K brute force and dimsum thresholding should be fine. For 60M x 10M probably brute force won't work depending on the cluster's power, and dimsum thresholding should work with appropriate threshold. Dimensionality reduction should help, and how effective it is will depend on your application and domain, it's worth trying if the direct computation doesn't work. You can also try running KMeans clustering (perhaps after dimensionality reduction) if your goal is to find batches of similar points instead of all pairs above a threshold. On Fri, Sep 5, 2014 at 8:02 PM, Debasish Das debasish.da...@gmail.com wrote: Also for tall and wide (rows ~60M, columns 10M), I am considering running a matrix factorization to reduce the dimension to say ~60M x 50 and then run all pair similarity... Did you also try similar ideas and saw positive results ? On Fri, Sep 5, 2014 at 7:54 PM, Debasish Das debasish.da...@gmail.com wrote: Ok...just to make sure I have RowMatrix[SparseVector] where rows are ~ 60M and columns are 10M say with billion data points... I have another version that's around 60M and ~ 10K... I guess for the second one both all pair and dimsum will run fine... But for tall and wide, what do you suggest ? can dimsum handle it ? I might need jaccard as well...can I plug that in the PR ? On Fri, Sep 5, 2014 at 7:48 PM, Reza Zadeh r...@databricks.com wrote: You might want to wait until Wednesday since the interface will be changing in that PR before Wednesday, probably over the weekend, so that you don't have to redo your code. Your call if you need it before a week. Reza On Fri, Sep 5, 2014 at 7:43 PM, Debasish Das debasish.da...@gmail.com wrote: Ohh coolall-pairs brute force is also part of this PR ? Let me pull it in and test on our dataset... Thanks. Deb On Fri, Sep 5, 2014 at 7:40 PM, Reza Zadeh r...@databricks.com wrote: Hi Deb, We are adding all-pairs and thresholded all-pairs via dimsum in this PR: https://github.com/apache/spark/pull/1778 Your question wasn't entirely clear - does this answer it? Best, Reza On Fri, Sep 5, 2014 at 6:14 PM, Debasish Das debasish.da...@gmail.com wrote: Hi Reza, Have you compared with the brute force algorithm for similarity computation with something like the following in Spark ? https://github.com/echen/scaldingale I am adding cosine similarity computation but I do want to compute an all pair similarities... Note that the data is sparse for me (the data that goes to matrix factorization) so I don't think joining and group-by on (product,product) will be a big issue for me... Does it make sense to add all pair similarities as well with dimsum based similarity ? Thanks. Deb On Fri, Apr 11, 2014 at 9:21 PM, Reza Zadeh r...@databricks.com wrote: Hi Xiaoli, There is a PR currently in progress to allow this, via the sampling scheme described in this paper: stanford.edu/~rezab/papers/dimsum.pdf The PR is at https://github.com/apache/spark/pull/336 though it will need refactoring given the recent changes to matrix interface in MLlib. You may implement the sampling scheme for your own app since it's much code. Best, Reza On Fri, Apr 11, 2014 at 9:17 PM, Xiaoli Li lixiaolima...@gmail.com wrote: Hi Andrew, Thanks for your suggestion. I have tried the method. I used 8 nodes and every node has 8G memory. The program just stopped at a stage for about several hours without any further information. Maybe I need to find out a more efficient way. On Fri, Apr 11, 2014 at 5:24 PM, Andrew Ash and...@andrewash.com wrote: The naive way would be to put all the users and their attributes into an RDD, then cartesian product that with itself. Run the similarity score on every pair (1M * 1M = 1T scores), map to (user, (score, otherUser)) and take the .top(k) for each user. I doubt that you'll be able to take this approach with the 1T pairs though, so it might be worth looking at the literature for recommender systems to see what else is out there. On Fri, Apr 11, 2014 at 9:54 PM, Xiaoli Li lixiaolima...@gmail.com wrote: Hi all, I am implementing an algorithm using Spark. I have one million users. I need to compute the similarity between each pair of users using some user's attributes. For each user, I need to get top k most similar users. What is the best way to implement this? Thanks.
Re: Huge matrix
Yes you're right, calling dimsum with gamma as PositiveInfinity turns it into the usual brute force algorithm for cosine similarity, there is no sampling. This is by design. On Fri, Sep 5, 2014 at 8:20 PM, Debasish Das debasish.da...@gmail.com wrote: I looked at the code: similarColumns(Double.posInf) is generating the brute force... Basically dimsum with gamma as PositiveInfinity will produce the exact same result as doing catesian products of RDD[(product, vector)] and computing similarities or there will be some approximation ? Sorry I have not read your paper yet. Will read it over the weekend. On Fri, Sep 5, 2014 at 8:13 PM, Reza Zadeh r...@databricks.com wrote: For 60M x 10K brute force and dimsum thresholding should be fine. For 60M x 10M probably brute force won't work depending on the cluster's power, and dimsum thresholding should work with appropriate threshold. Dimensionality reduction should help, and how effective it is will depend on your application and domain, it's worth trying if the direct computation doesn't work. You can also try running KMeans clustering (perhaps after dimensionality reduction) if your goal is to find batches of similar points instead of all pairs above a threshold. On Fri, Sep 5, 2014 at 8:02 PM, Debasish Das debasish.da...@gmail.com wrote: Also for tall and wide (rows ~60M, columns 10M), I am considering running a matrix factorization to reduce the dimension to say ~60M x 50 and then run all pair similarity... Did you also try similar ideas and saw positive results ? On Fri, Sep 5, 2014 at 7:54 PM, Debasish Das debasish.da...@gmail.com wrote: Ok...just to make sure I have RowMatrix[SparseVector] where rows are ~ 60M and columns are 10M say with billion data points... I have another version that's around 60M and ~ 10K... I guess for the second one both all pair and dimsum will run fine... But for tall and wide, what do you suggest ? can dimsum handle it ? I might need jaccard as well...can I plug that in the PR ? On Fri, Sep 5, 2014 at 7:48 PM, Reza Zadeh r...@databricks.com wrote: You might want to wait until Wednesday since the interface will be changing in that PR before Wednesday, probably over the weekend, so that you don't have to redo your code. Your call if you need it before a week. Reza On Fri, Sep 5, 2014 at 7:43 PM, Debasish Das debasish.da...@gmail.com wrote: Ohh coolall-pairs brute force is also part of this PR ? Let me pull it in and test on our dataset... Thanks. Deb On Fri, Sep 5, 2014 at 7:40 PM, Reza Zadeh r...@databricks.com wrote: Hi Deb, We are adding all-pairs and thresholded all-pairs via dimsum in this PR: https://github.com/apache/spark/pull/1778 Your question wasn't entirely clear - does this answer it? Best, Reza On Fri, Sep 5, 2014 at 6:14 PM, Debasish Das debasish.da...@gmail.com wrote: Hi Reza, Have you compared with the brute force algorithm for similarity computation with something like the following in Spark ? https://github.com/echen/scaldingale I am adding cosine similarity computation but I do want to compute an all pair similarities... Note that the data is sparse for me (the data that goes to matrix factorization) so I don't think joining and group-by on (product,product) will be a big issue for me... Does it make sense to add all pair similarities as well with dimsum based similarity ? Thanks. Deb On Fri, Apr 11, 2014 at 9:21 PM, Reza Zadeh r...@databricks.com wrote: Hi Xiaoli, There is a PR currently in progress to allow this, via the sampling scheme described in this paper: stanford.edu/~rezab/papers/dimsum.pdf The PR is at https://github.com/apache/spark/pull/336 though it will need refactoring given the recent changes to matrix interface in MLlib. You may implement the sampling scheme for your own app since it's much code. Best, Reza On Fri, Apr 11, 2014 at 9:17 PM, Xiaoli Li lixiaolima...@gmail.com wrote: Hi Andrew, Thanks for your suggestion. I have tried the method. I used 8 nodes and every node has 8G memory. The program just stopped at a stage for about several hours without any further information. Maybe I need to find out a more efficient way. On Fri, Apr 11, 2014 at 5:24 PM, Andrew Ash and...@andrewash.com wrote: The naive way would be to put all the users and their attributes into an RDD, then cartesian product that with itself. Run the similarity score on every pair (1M * 1M = 1T scores), map to (user, (score, otherUser)) and take the .top(k) for each user. I doubt that you'll be able to take this approach with the 1T pairs though, so it might be worth looking at the literature for recommender systems to see what else is out there. On Fri, Apr 11, 2014 at 9:54 PM, Xiaoli Li lixiaolima...@gmail.com wrote: Hi all, I am implementing an algorithm using Spark. I have one million users. I need to compute the similarity between each
Re: Huge matrix
Awesome...Let me try it out... Any plans of putting other similarity measures in future (jaccard is something that will be useful) ? I guess it makes sense to add some similarity measures in mllib... On Fri, Sep 5, 2014 at 8:55 PM, Reza Zadeh r...@databricks.com wrote: Yes you're right, calling dimsum with gamma as PositiveInfinity turns it into the usual brute force algorithm for cosine similarity, there is no sampling. This is by design. On Fri, Sep 5, 2014 at 8:20 PM, Debasish Das debasish.da...@gmail.com wrote: I looked at the code: similarColumns(Double.posInf) is generating the brute force... Basically dimsum with gamma as PositiveInfinity will produce the exact same result as doing catesian products of RDD[(product, vector)] and computing similarities or there will be some approximation ? Sorry I have not read your paper yet. Will read it over the weekend. On Fri, Sep 5, 2014 at 8:13 PM, Reza Zadeh r...@databricks.com wrote: For 60M x 10K brute force and dimsum thresholding should be fine. For 60M x 10M probably brute force won't work depending on the cluster's power, and dimsum thresholding should work with appropriate threshold. Dimensionality reduction should help, and how effective it is will depend on your application and domain, it's worth trying if the direct computation doesn't work. You can also try running KMeans clustering (perhaps after dimensionality reduction) if your goal is to find batches of similar points instead of all pairs above a threshold. On Fri, Sep 5, 2014 at 8:02 PM, Debasish Das debasish.da...@gmail.com wrote: Also for tall and wide (rows ~60M, columns 10M), I am considering running a matrix factorization to reduce the dimension to say ~60M x 50 and then run all pair similarity... Did you also try similar ideas and saw positive results ? On Fri, Sep 5, 2014 at 7:54 PM, Debasish Das debasish.da...@gmail.com wrote: Ok...just to make sure I have RowMatrix[SparseVector] where rows are ~ 60M and columns are 10M say with billion data points... I have another version that's around 60M and ~ 10K... I guess for the second one both all pair and dimsum will run fine... But for tall and wide, what do you suggest ? can dimsum handle it ? I might need jaccard as well...can I plug that in the PR ? On Fri, Sep 5, 2014 at 7:48 PM, Reza Zadeh r...@databricks.com wrote: You might want to wait until Wednesday since the interface will be changing in that PR before Wednesday, probably over the weekend, so that you don't have to redo your code. Your call if you need it before a week. Reza On Fri, Sep 5, 2014 at 7:43 PM, Debasish Das debasish.da...@gmail.com wrote: Ohh coolall-pairs brute force is also part of this PR ? Let me pull it in and test on our dataset... Thanks. Deb On Fri, Sep 5, 2014 at 7:40 PM, Reza Zadeh r...@databricks.com wrote: Hi Deb, We are adding all-pairs and thresholded all-pairs via dimsum in this PR: https://github.com/apache/spark/pull/1778 Your question wasn't entirely clear - does this answer it? Best, Reza On Fri, Sep 5, 2014 at 6:14 PM, Debasish Das debasish.da...@gmail.com wrote: Hi Reza, Have you compared with the brute force algorithm for similarity computation with something like the following in Spark ? https://github.com/echen/scaldingale I am adding cosine similarity computation but I do want to compute an all pair similarities... Note that the data is sparse for me (the data that goes to matrix factorization) so I don't think joining and group-by on (product,product) will be a big issue for me... Does it make sense to add all pair similarities as well with dimsum based similarity ? Thanks. Deb On Fri, Apr 11, 2014 at 9:21 PM, Reza Zadeh r...@databricks.com wrote: Hi Xiaoli, There is a PR currently in progress to allow this, via the sampling scheme described in this paper: stanford.edu/~rezab/papers/dimsum.pdf The PR is at https://github.com/apache/spark/pull/336 though it will need refactoring given the recent changes to matrix interface in MLlib. You may implement the sampling scheme for your own app since it's much code. Best, Reza On Fri, Apr 11, 2014 at 9:17 PM, Xiaoli Li lixiaolima...@gmail.com wrote: Hi Andrew, Thanks for your suggestion. I have tried the method. I used 8 nodes and every node has 8G memory. The program just stopped at a stage for about several hours without any further information. Maybe I need to find out a more efficient way. On Fri, Apr 11, 2014 at 5:24 PM, Andrew Ash and...@andrewash.com wrote: The naive way would be to put all the users and their attributes into an RDD, then cartesian product that with itself. Run the similarity score on every pair (1M * 1M = 1T scores), map to (user, (score, otherUser)) and take the .top(k) for each user. I doubt that you'll be able to take this approach with the 1T pairs though, so it might be worth looking at
Re: Huge matrix
I will add dice, overlap, and jaccard similarity in a future PR, probably still for 1.2 On Fri, Sep 5, 2014 at 9:15 PM, Debasish Das debasish.da...@gmail.com wrote: Awesome...Let me try it out... Any plans of putting other similarity measures in future (jaccard is something that will be useful) ? I guess it makes sense to add some similarity measures in mllib... On Fri, Sep 5, 2014 at 8:55 PM, Reza Zadeh r...@databricks.com wrote: Yes you're right, calling dimsum with gamma as PositiveInfinity turns it into the usual brute force algorithm for cosine similarity, there is no sampling. This is by design. On Fri, Sep 5, 2014 at 8:20 PM, Debasish Das debasish.da...@gmail.com wrote: I looked at the code: similarColumns(Double.posInf) is generating the brute force... Basically dimsum with gamma as PositiveInfinity will produce the exact same result as doing catesian products of RDD[(product, vector)] and computing similarities or there will be some approximation ? Sorry I have not read your paper yet. Will read it over the weekend. On Fri, Sep 5, 2014 at 8:13 PM, Reza Zadeh r...@databricks.com wrote: For 60M x 10K brute force and dimsum thresholding should be fine. For 60M x 10M probably brute force won't work depending on the cluster's power, and dimsum thresholding should work with appropriate threshold. Dimensionality reduction should help, and how effective it is will depend on your application and domain, it's worth trying if the direct computation doesn't work. You can also try running KMeans clustering (perhaps after dimensionality reduction) if your goal is to find batches of similar points instead of all pairs above a threshold. On Fri, Sep 5, 2014 at 8:02 PM, Debasish Das debasish.da...@gmail.com wrote: Also for tall and wide (rows ~60M, columns 10M), I am considering running a matrix factorization to reduce the dimension to say ~60M x 50 and then run all pair similarity... Did you also try similar ideas and saw positive results ? On Fri, Sep 5, 2014 at 7:54 PM, Debasish Das debasish.da...@gmail.com wrote: Ok...just to make sure I have RowMatrix[SparseVector] where rows are ~ 60M and columns are 10M say with billion data points... I have another version that's around 60M and ~ 10K... I guess for the second one both all pair and dimsum will run fine... But for tall and wide, what do you suggest ? can dimsum handle it ? I might need jaccard as well...can I plug that in the PR ? On Fri, Sep 5, 2014 at 7:48 PM, Reza Zadeh r...@databricks.com wrote: You might want to wait until Wednesday since the interface will be changing in that PR before Wednesday, probably over the weekend, so that you don't have to redo your code. Your call if you need it before a week. Reza On Fri, Sep 5, 2014 at 7:43 PM, Debasish Das debasish.da...@gmail.com wrote: Ohh coolall-pairs brute force is also part of this PR ? Let me pull it in and test on our dataset... Thanks. Deb On Fri, Sep 5, 2014 at 7:40 PM, Reza Zadeh r...@databricks.com wrote: Hi Deb, We are adding all-pairs and thresholded all-pairs via dimsum in this PR: https://github.com/apache/spark/pull/1778 Your question wasn't entirely clear - does this answer it? Best, Reza On Fri, Sep 5, 2014 at 6:14 PM, Debasish Das debasish.da...@gmail.com wrote: Hi Reza, Have you compared with the brute force algorithm for similarity computation with something like the following in Spark ? https://github.com/echen/scaldingale I am adding cosine similarity computation but I do want to compute an all pair similarities... Note that the data is sparse for me (the data that goes to matrix factorization) so I don't think joining and group-by on (product,product) will be a big issue for me... Does it make sense to add all pair similarities as well with dimsum based similarity ? Thanks. Deb On Fri, Apr 11, 2014 at 9:21 PM, Reza Zadeh r...@databricks.com wrote: Hi Xiaoli, There is a PR currently in progress to allow this, via the sampling scheme described in this paper: stanford.edu/~rezab/papers/dimsum.pdf The PR is at https://github.com/apache/spark/pull/336 though it will need refactoring given the recent changes to matrix interface in MLlib. You may implement the sampling scheme for your own app since it's much code. Best, Reza On Fri, Apr 11, 2014 at 9:17 PM, Xiaoli Li lixiaolima...@gmail.com wrote: Hi Andrew, Thanks for your suggestion. I have tried the method. I used 8 nodes and every node has 8G memory. The program just stopped at a stage for about several hours without any further information. Maybe I need to find out a more efficient way. On Fri, Apr 11, 2014 at 5:24 PM, Andrew Ash and...@andrewash.com wrote: The naive way would be to put all the users and their attributes into an RDD, then cartesian product that with itself. Run the similarity score on every pair (1M * 1M = 1T scores),
Array and RDDs
Hi, I have an input file which consists of stc_node dest_node I have created and RDD consisting of key-value pair where key is the node id and the values are the children of that node. Now I want to associate a byte with each node. For that I have created a byte array. Every time I print out the key-value pair in the RDD the key-value pairs do not come in the same order. Because of this I am finding it difficult to assign the byte values with each node. Can anyone help me out in this matter? I basically have the following code: val bitarray = Array.fill[Byte](number)(0) And I want to assiciate each byte in the array to a node. How should I do that? Thank You