Re: IP to geo information in spark streaming application
1) I think using library based solution is a better idea, we used that, and it works.2) We used broadcast variable, and it works qinwei From: Noam KfirDate: 2014-12-02 23:14To: user@spark.apache.orgSubject: IP to geo information in spark streaming application Hi I'm new to spark streaming. I'm currently writing spark streaming application to standardize events coming from Kinesis. As part of the logic, I want to use IP to geo information library or service. My questions: 1) If I would use some REST service for this task, do U think it would cause performance penalty (over using library based solution) 2) If I would use a library based solution, I will have to use some local db file. What mechanism should I use in order to transfer such db file? a broadcast variable? Tx, Noam.
Re: Pass RDD to functions
I think it‘s ok,feel free to treat RDD like common object qinwei From: Deep PradhanDate: 2014-11-12 18:24To: user@spark.apache.orgSubject: Pass RDD to functionsHi, Can we pass RDD to functions?Like, can we do the following? def func (temp: RDD[String]):RDD[String] = {//body of the function} Thank You
why flatmap has shuffle
Hi, everyone! I consider flatmap as a narrow dependency , but why it has shuffle? as shown on the web UI: my code is as below : val transferRDD = sc.textFile(hdfs://host:port/path) val rdd = transferRDD.map(line = { val trunks = line.split(\t) if(trunks.length == 32){ (trunks(11), trunks(13), Try(java.lang.Long.parseLong(trunks(9))).getOrElse(0l), trunks(14), trunks(19)) } }).filter(arg =arg != ()).map(arg = arg.asInstanceOf[(String, String, Long, String, String)]).filter(arg = arg._3 != 0)val flatMappedRDD = rdd.flatMap(arg = List((arg._1, (arg._2, arg._3, 1)), (arg._2, (arg._1, arg._3, 0 Thank for your help! qinwei
Re: Joined RDD
I think it is because A.join(B) is a shuffle map stage, whose result is stored temporarily (i'm not sure it's in memeory or in disk)I saw the word map output in the log of my spark application, i think it is the intermediate result of my application, and according to the log, it is stored qinwei From: ajay gargDate: 2014-11-13 14:56To: userSubject: Joined RDDHi, I have two RDDs A and B which are created from reading file from HDFS. I have a third RDD C which is created by taking join of A and B. All three RDDs (A, B and C ) are not cached. Now if I perform any action on C (let say collect), action is served without reading any data from the disk. Since no data is cached in spark how is action on C is served without reading data from disk. Thanks --Ajay -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Joined-RDD-tp18820.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Re: about write mongodb in mapPartitions
Thanks for your reply! According to your hint, the code should be like this: // i want to save data in rdd to mongodb and hdfs rdd.saveAsNewAPIHadoopFile() rdd.saveAsTextFile() but will the application read hdfs twice? qinwei From: Akhil DasDate: 2014-11-07 18:32To: qinweiCC: userSubject: Re: about write mongodb in mapPartitionsWhy not saveAsNewAPIHadoopFile? //Define your mongoDB confsval config = new Configuration() config.set(mongo.output.uri, mongodb://127.0.0.1:27017/sigmoid.output) //Write everything to mongo rdd.saveAsNewAPIHadoopFile(file:///some/random, classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], config) ThanksBest Regards On Fri, Nov 7, 2014 at 2:53 PM, qinwei wei@dewmobile.net wrote: Hi, everyone I come across with a prolem about writing data to mongodb in mapPartitions, my code is as below: val sourceRDD = sc.textFile(hdfs://host:port/sourcePath) // some transformations val rdd= sourceRDD .map(mapFunc).filter(filterFunc) val newRDD = rdd.mapPartitions(args = { val mongoClient = new MongoClient(host, port) val db = mongoClient.getDB(db) val coll = db.getCollection(collectionA) args.map(arg = { coll.insert(new BasicDBObject(pkg, arg)) arg }) mongoClient.close() args }) newRDD.saveAsTextFile(hdfs://host:port/path) The application saved data to HDFS correctly, but not mongodb, is there someting wrong? I know that collecting the newRDD to driver and then saving it to mongodb will success, but will the following saveAsTextFile read the filesystem once again? Thanks qinwei
Re: Re: about write mongodb in mapPartitions
Thanks for your reply! As you mentioned , the insert clause is not executed as the results of args.map are never used anywhere, and after i modified the code , it works. qinwei From: Tobias PfeifferDate: 2014-11-07 18:04To: qinweiCC: userSubject: Re: about write mongodb in mapPartitionsHi, On Fri, Nov 7, 2014 at 6:23 PM, qinwei wei@dewmobile.net wrote: args.map(arg = { coll.insert(new BasicDBObject(pkg, arg)) arg }) mongoClient.close() args As the results of args.map are never used anywhere, I think the loop body is not executed at all. Maybe try: val argsProcessed = args.map(arg = { coll.insert(new BasicDBObject(pkg, arg)) arg }) mongoClient.close() argsProcessed Tobias
about write mongodb in mapPartitions
Hi, everyone I come across with a prolem about writing data to mongodb in mapPartitions, my code is as below: val sourceRDD = sc.textFile(hdfs://host:port/sourcePath) // some transformations val rdd= sourceRDD .map(mapFunc).filter(filterFunc) val newRDD = rdd.mapPartitions(args = { val mongoClient = new MongoClient(host, port) val db = mongoClient.getDB(db) val coll = db.getCollection(collectionA) args.map(arg = { coll.insert(new BasicDBObject(pkg, arg)) arg }) mongoClient.close() args }) newRDD.saveAsTextFile(hdfs://host:port/path) The application saved data to HDFS correctly, but not mongodb, is there someting wrong? I know that collecting the newRDD to driver and then saving it to mongodb will success, but will the following saveAsTextFile read the filesystem once again? Thanks qinwei
about aggregateByKey and standard deviation
Hi, everyone I have an RDD filled with data like (k1, v11) (k1, v12) (k1, v13) (k2, v21) (k2, v22) (k2, v23) ... I want to calculate the average and standard deviation of (v11, v12, v13) and (v21, v22, v23) group by there keys for the moment, i have done that by using groupByKey and map, I notice that groupByKey is very expensive, but i can not figure out how to do it by using aggregateByKey, so i wonder is there any better way to do this? Thanks! qinwei
problem with data locality api
Hi, everyone? ? I come across with a problem about data locality, i found these?example?code in 《Spark-on-YARN-A-Deep-Dive-Sandy-Ryza.pdf》? ??? ??val locData = InputFormatInfo.computePreferredLocations(Seq(new InputFormatInfo(conf, classOf[TextInputFormat], new Path(“myfile.txt”)))?? ??? ??val sc = new SparkContext(conf, locData)? ? but i found the two confs above are of different types, conf in the first line if of type?org.apache.hadoop.conf.Configuration, and conf in the second line is of type SparkConf, ?can anyone explain that to me or give me some example code?? ?? qinwei
problem with patitioning
Hi, everyone I come across a problem with changing the patition number of the rdd, my code is as below: val rdd1 = sc.textFile(path1) val rdd2 = sc.textFile(path2) val rdd3 = sc.textFile(path3) val imeiList = parseParam(job.jobParams) val broadcastVar = sc.broadcast(imeiList) val structuredRDD1 = rdd1.map(line = { val trunks = line.split(\t) if(trunks.length == 35){ (trunks(6).trim, trunks(7).trim, trunks(3).trim, trunks(5).trim, trunks(12).trim, trunks(13).trim.toLong) } }) val structuredRDD2 = rdd2.map(line = { val trunks = line.split(\t) if(trunks.length == 33){ (trunks(6).trim, trunks(8).trim, trunks(9).trim, trunks(14).trim, trunks(12).trim, trunks(3).trim.toLong) } }) val structuredRDD3 = rdd3.map(line = { val trunks = line.split(\t) if(trunks.length == 33){ (trunks(6).trim, trunks(8).trim, trunks(9).trim, trunks(14).trim, trunks(12).trim, trunks(3).trim.toLong) } }) val unionedRDD = structuredRDD1.union(structuredRDD2).union(structuredRDD3) val resRDD = unionedRDD.filter(arg = arg != null arg != ()) .map(arg = arg.asInstanceOf[(String, String, String, String, String, Long)]) .filter(arg = imeiFilter(arg._1, broadcastVar.value, 0) || imeiFilter(arg._2, broadcastVar.value, 0)) val jsonStrRDD = resRDD.map(arg = {\f_imei\ : \ + arg._1 + \, \t_imei\ : \ + arg._2 + \, \dgst\ : \ + arg._3 + \, \n\ : \ + arg._4 + \, \s\ : + arg._5.toString() + , \ts\ : + arg._6.toString() + }) val jsonArray = jsonStrRDD.collect I noticed that there are 3834 tasks by default, and 3834 is the number of files in path1 and path2 and path3, i want to change the number of patition by the code below: val rdd1 = sc.textFile(path1, 1920) val rdd2 = sc.textFile(path2, 1920) val rdd3 = sc.textFile(path3, 1920) by doing this, i expect there are 1920 tasks totally, but i found the number of tasks becomes 8920, any idea what's going on here? Thanks! qinwei
回复: RE: problem with data locality api
Thank you for your reply, ? ? I understand your explaination, but i wonder what is the?correct usage of the apinew SparkContext(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]])how to construct the second param?preferredNodeLocationData?hope for your reply! qinwei ?发件人:?Shao, Saisai发送时间:?2014-09-28?14:42收件人:?qinwei抄送:?user主题:?RE: problem with data locality api Hi ? First conf is used for Hadoop to determine the locality distribution of HDFS file. Second conf is used for Spark, though with the same name, actually they are two different classes. ? Thanks Jerry ? From: qinwei [mailto:wei@dewmobile.net] Sent: Sunday, September 28, 2014 2:05 PM To: user Subject: problem with data locality api ? Hi, everyone ? ? I come across with a problem about data locality, i found these?example?code in 《Spark-on-YARN-A-Deep-Dive-Sandy-Ryza.pdf》 ? ??? ??val locData = InputFormatInfo.computePreferredLocations(Seq(new InputFormatInfo(conf, classOf[TextInputFormat], new Path(“myfile.txt”)))? ? ??? ??val sc = new SparkContext(conf, locData) ? ? but i found the two confs above are of different types, conf in the first line if of type?org.apache.hadoop.conf.Configuration, and conf in the second line is of type SparkConf, ?can anyone explain that to me or give me some example code? ? ?? qinwei
Re: Re: problem with patitioning
Thank you for your reply, and your tips on code refactoring is helpful, after a second look on the code, the casts and null check is really unnecessary. qinwei From: Sean OwenDate: 2014-09-28 15:03To: qinweiCC: userSubject: Re: problem with patitioning(Most of this code is not relevant to the question and can be refactored too. The casts and null checks look unnecessary.) You are unioning RDDs so you have a result with the sum of their partitions. The number of partitions is really a hint to Hadoop only so it is not even necessarily 3 x 1920. Try not specifying the partitions at the source, and instead trying repartition after union to reduce the number of partitions. On Sep 28, 2014 7:36 AM, qinwei wei@dewmobile.net wrote: Hi, everyone I come across a problem with changing the patition number of the rdd, my code is as below: val rdd1 = sc.textFile(path1) val rdd2 = sc.textFile(path2) val rdd3 = sc.textFile(path3) val imeiList = parseParam(job.jobParams) val broadcastVar = sc.broadcast(imeiList) val structuredRDD1 = rdd1.map(line = { val trunks = line.split(\t) if(trunks.length == 35){ (trunks(6).trim, trunks(7).trim, trunks(3).trim, trunks(5).trim, trunks(12).trim, trunks(13).trim.toLong) } }) val structuredRDD2 = rdd2.map(line = { val trunks = line.split(\t) if(trunks.length == 33){ (trunks(6).trim, trunks(8).trim, trunks(9).trim, trunks(14).trim, trunks(12).trim, trunks(3).trim.toLong) } }) val structuredRDD3 = rdd3.map(line = { val trunks = line.split(\t) if(trunks.length == 33){ (trunks(6).trim, trunks(8).trim, trunks(9).trim, trunks(14).trim, trunks(12).trim, trunks(3).trim.toLong) } }) val unionedRDD = structuredRDD1.union(structuredRDD2).union(structuredRDD3) val resRDD = unionedRDD.filter(arg = arg != null arg != ()) .map(arg = arg.asInstanceOf[(String, String, String, String, String, Long)]) .filter(arg = imeiFilter(arg._1, broadcastVar.value, 0) || imeiFilter(arg._2, broadcastVar.value, 0)) val jsonStrRDD = resRDD.map(arg = {\f_imei\ : \ + arg._1 + \, \t_imei\ : \ + arg._2 + \, \dgst\ : \ + arg._3 + \, \n\ : \ + arg._4 + \, \s\ : + arg._5.toString() + , \ts\ : + arg._6.toString() + }) val jsonArray = jsonStrRDD.collect I noticed that there are 3834 tasks by default, and 3834 is the number of files in path1 and path2 and path3, i want to change the number of patition by the code below: val rdd1 = sc.textFile(path1, 1920) val rdd2 = sc.textFile(path2, 1920) val rdd3 = sc.textFile(path3, 1920) by doing this, i expect there are 1920 tasks totally, but i found the number of tasks becomes 8920, any idea what's going on here? Thanks! qinwei
Re: How to use multi thread in RDD map function ?
in the options of spark-submit, there are two options which may be helpful to your problem, they are --total-executor-cores NUM(standalone and mesos only), --executor-cores(yarn only) qinwei From: myasukaDate: 2014-09-28 11:44To: userSubject: How to use multi thread in RDD map function ?Hi, everyone I come across with a problem about increasing the concurency. In a program, after shuffle write, each node should fetch 16 pair matrices to do matrix multiplication. such as: * import breeze.linalg.{DenseMatrix = BDM} pairs.map(t = { val b1 = t._2._1.asInstanceOf[BDM[Double]] val b2 = t._2._2.asInstanceOf[BDM[Double]] val c = (b1 * b2).asInstanceOf[BDM[Double]] (new BlockID(t._1.row, t._1.column), c) })* Each node has 16 cores. However, no matter I set 16 tasks or more on each node, the concurrency cannot be higher than 60%, which means not every core on the node is computing. Then I check the running log on the WebUI, according to the amount of shuffle read and write in every task, I see some task do once matrix multiplication, some do twice while some do none. Thus, I think of using java multi thread to increase the concurrency. I wrote a program in scala which calls java multi thread without Spark on a single node, by watch the 'top' monitor, I find this program can use CPU up to 1500% ( means nearly every core are computing). But I have no idea how to use Java multi thread in RDD transformation. Is there any one can provide some example code to use Java multi thread in RDD transformation, or give any idea to increase the concurrency ? Thanks for all -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-multi-thread-in-RDD-map-function-tp15286.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Re: what is the best way to do cartesian
Thanks a lot for your reply, but i have tried the built-in RDD.cartesian() method before, it didn't make it faster. qinwei From: Alex BoisvertDate: 2014-04-26 00:32To: userSubject: Re: what is the best way to do cartesianYou might want to try the built-in RDD.cartesian() method. On Thu, Apr 24, 2014 at 9:05 PM, Qin Wei wei@dewmobile.net wrote: Hi All, I have a problem with the Item-Based Collaborative Filtering Recommendation Algorithms in spark. The basic flow is as below: (Item1 , (User1 , Score1)) RDD1 == (Item2 , (User2 , Score2)) (Item1 , (User2 , Score3)) (Item2 , (User1 , Score4)) RDD1.groupByKey == RDD2 (Item1, ((User1, Score1), (User2, Score3))) (Item2, ((User1, Score4), (User2, Score2))) The similarity of Vector ((User1, Score1), (User2, Score3)) and ((User1, Score4), (User2, Score2)) is the similarity of Item1 and Item2. In my situation, RDD2 contains 20 million records, my spark programm is extreamly slow, the source code is as below: val conf = new SparkConf().setMaster(spark://211.151.121.184:7077).setAppName(Score Calcu Total).set(spark.executor.memory, 20g).setJars(Seq(/home/deployer/score-calcu-assembly-1.0.jar)) val sc = new SparkContext(conf) val mongoRDD = sc.textFile(args(0).toString, 400) val jsonRDD = mongoRDD.map(arg = new JSONObject(arg)) val newRDD = jsonRDD.map(arg = { var score = haha(arg.get(a).asInstanceOf[JSONObject]) // set score to 0.5 for testing arg.put(score, 0.5) arg }) val resourceScoresRDD = newRDD.map(arg = (arg.get(rid).toString.toLong, (arg.get(zid).toString, arg.get(score).asInstanceOf[Number].doubleValue))).groupByKey().cache() val resourceScores = resourceScoresRDD.collect() val bcResourceScores = sc.broadcast(resourceScores) val simRDD = resourceScoresRDD.mapPartitions({iter = val m = bcResourceScores.value for{ (r1, v1) - iter (r2, v2) - m if r1 r2 } yield (r1, r2, cosSimilarity(v1, v2))}, true).filter(arg = arg._3 0.1) println(simRDD.count) And I saw this in Spark Web UI: http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204018.png http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204001.png My standalone cluster has 3 worker node (16 core and 32G RAM),and the workload of the machine in my cluster is heavy when the spark program is running. Is there any better way to do the algorithm? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-do-cartesian-tp4807.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Re: Problem with the Item-Based Collaborative Filtering Recommendation Algorithms in spark
Thanks a lot for your reply, it gave me much inspiration. qinwei From: Sean OwenDate: 2014-04-25 14:10To: userSubject: Re: Problem with the Item-Based Collaborative Filtering Recommendation Algorithms in sparkSo you are computing all-pairs similarity over 20M users? This going to take about 200 trillion similarity computations, no? I don't think there's any way to make that fundamentally fast. I see you're copying the data set to all workers, which helps make it faster at the expense of memory consumption. If you really want to do this and can tolerate some approximation, I think you want to do some kind of location sensitive hashing to bucket the vectors and then evaluate similarity to only the other items in the bucket. On Fri, Apr 25, 2014 at 5:55 AM, Qin Wei wei@dewmobile.net wrote: Hi All, I have a problem with the Item-Based Collaborative Filtering Recommendation Algorithms in spark. The basic flow is as below: (Item1 , (User1 , Score1)) RDD1 == (Item2 , (User2 , Score2)) (Item1 , (User2 , Score3)) (Item2 , (User1 , Score4)) RDD1.groupByKey == RDD2 (Item1, ((User1, Score1), (User2, Score3))) (Item2, ((User1, Score4), (User2, Score2))) The similarity of Vector ((User1, Score1), (User2, Score3)) and ((User1, Score4), (User2, Score2)) is the similarity of Item1 and Item2. In my situation, RDD2 contains 20 million records, my spark programm is extreamly slow, the source code is as below: val conf = new SparkConf().setMaster(spark://211.151.121.184:7077).setAppName(Score Calcu Total).set(spark.executor.memory, 20g).setJars(Seq(/home/deployer/score-calcu-assembly-1.0.jar)) val sc = new SparkContext(conf) val mongoRDD = sc.textFile(args(0).toString, 400) val jsonRDD = mongoRDD.map(arg = new JSONObject(arg)) val newRDD = jsonRDD.map(arg = { var score = haha(arg.get(a).asInstanceOf[JSONObject]) // set score to 0.5 for testing arg.put(score, 0.5) arg }) val resourceScoresRDD = newRDD.map(arg = (arg.get(rid).toString.toLong, (arg.get(zid).toString, arg.get(score).asInstanceOf[Number].doubleValue))).groupByKey().cache() val resourceScores = resourceScoresRDD.collect() val bcResourceScores = sc.broadcast(resourceScores) val simRDD = resourceScoresRDD.mapPartitions({iter = val m = bcResourceScores.value for{ (r1, v1) - iter (r2, v2) - m if r1 r2 } yield (r1, r2, cosSimilarity(v1, v2))}, true).filter(arg = arg._3 0.1) println(simRDD.count) And I saw this in Spark Web UI: http://apache-spark-user-list.1001560.n3.nabble.com/file/n4808/QQ%E6%88%AA%E5%9B%BE20140424204018.png http://apache-spark-user-list.1001560.n3.nabble.com/file/n4808/QQ%E6%88%AA%E5%9B%BE20140424204001.png My standalone cluster has 3 worker node (16 core and 32G RAM),and the workload of the machine in my cluster is heavy when the spark program is running. Is there any better way to do the algorithm? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-the-Item-Based-Collaborative-Filtering-Recommendation-Algorithms-in-spark-tp4808.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Re: how to set spark.executor.memory and heap size
try the complete path qinwei From: wxhsdpDate: 2014-04-24 14:21To: userSubject: Re: how to set spark.executor.memory and heap sizethank you, i add setJars, but nothing changes val conf = new SparkConf() .setMaster(spark://127.0.0.1:7077) .setAppName(Simple App) .set(spark.executor.memory, 1g) .setJars(Seq(target/scala-2.10/simple-project_2.10-1.0.jar)) val sc = new SparkContext(conf) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-set-spark-executor-memory-and-heap-size-tp4719p4732.html Sent from the Apache Spark User List mailing list archive at Nabble.com.