Re: CollectAsMap, Broadcasting.
Correct me if I'm wrong, but he can actually run thus code without broadcasting the users map, however the code will be less efficient. czw., 26 lut 2015, 12:31 PM Sean Owen użytkownik so...@cloudera.com napisał: Yes, but there is no concept of executors 'deleting' an RDD. And you would want to broadcast the usersMap if you're using it this way. On Thu, Feb 26, 2015 at 11:26 AM, Guillermo Ortiz konstt2...@gmail.com wrote: One last time to be sure I got it right, the executing sequence here goes like this?: val usersMap = contacts.collectAsMap() #The contacts RDD is collected by the executors and sent to the driver, the executors delete the rdd contacts.map(v = (v._1, (usersMap(v._1), v._2))).collect() #The userMap object is sent again to the executors to run the code, and with the collect(), the result is sent again back to the driver 2015-02-26 11:57 GMT+01:00 Sean Owen so...@cloudera.com: Yes, in that code, usersMap has been serialized to every executor. I thought you were referring to accessing the copy in the driver. On Thu, Feb 26, 2015 at 10:47 AM, Guillermo Ortiz konstt2...@gmail.com wrote: Isn't it contacts.map(v = (v._1, (usersMap(v._1), v._2))).collect() executed in the executors? why is it executed in the driver? contacts are not a local object, right? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: CollectAsMap, Broadcasting.
Yes that's correct; it works but broadcasting would be more efficient. On Thu, Feb 26, 2015 at 1:20 PM, Paweł Szulc paul.sz...@gmail.com wrote: Correct me if I'm wrong, but he can actually run thus code without broadcasting the users map, however the code will be less efficient. czw., 26 lut 2015, 12:31 PM Sean Owen użytkownik so...@cloudera.com napisał: Yes, but there is no concept of executors 'deleting' an RDD. And you would want to broadcast the usersMap if you're using it this way. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: CollectAsMap, Broadcasting.
No. That code is just Scala code executing on the driver. usersMap is a local object. This bit has nothing to do with Spark. Yes you would have to broadcast it to use it efficient in functions (not on the driver). On Thu, Feb 26, 2015 at 10:24 AM, Guillermo Ortiz konstt2...@gmail.com wrote: So, on my example, when I execute: val usersMap = contacts.collectAsMap() -- Map goes to the driver and just lives there in the beginning. contacts.map(v = (v._1, (usersMap(v._1), v._2))).collect When I execute usersMap(v._1), Does driver has to send to the executorX the value which it needs? I guess I'm missing something. How does the data transfer among usersMap(just in the driver) and executors work? On this case it looks like better to use broadcasting like: val usersMap = contacts.collectAsMap() val bc = sc.broadcast(usersMap) contacts.map(v = (v._1, (bc.value(v._1), v._2))).collect() 2015-02-26 11:16 GMT+01:00 Sean Owen so...@cloudera.com: No, it exists only on the driver, not the executors. Executors don't retain partitions unless they are supposed to be persisted. Generally, broadcasting a small Map to accomplish a join 'manually' is more efficient than a join, but you are right that this is mostly because joins usually involve shuffles. If not, it's not as clear which way is best. I suppose that if the Map is large-ish, it's safer to not keep pulling it to the driver. On Thu, Feb 26, 2015 at 10:00 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I have a question, If I execute this code, val users = sc.textFile(/tmp/users.log).map(x = x.split(,)).map( v = (v(0), v(1))) val contacts = sc.textFile(/tmp/contacts.log).map(y = y.split(,)).map( v = (v(0), v(1))) val usersMap = contacts.collectAsMap() contacts.map(v = (v._1, (usersMap(v._1), v._2))).collect() When I execute collectAsMap, where is data? in each Executor?? I guess than each executor has data that it proccesed. The result is sent to the driver, but I guess that each executor keeps its piece of processed data. I guess that it's more efficient that to use a join in this case because there's not shuffle but If I save usersMap as a broadcast variable, wouldn't it be less efficient because I'm sending data to executors and don't need it? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
CollectAsMap, Broadcasting.
I have a question, If I execute this code, val users = sc.textFile(/tmp/users.log).map(x = x.split(,)).map( v = (v(0), v(1))) val contacts = sc.textFile(/tmp/contacts.log).map(y = y.split(,)).map( v = (v(0), v(1))) val usersMap = contacts.collectAsMap() contacts.map(v = (v._1, (usersMap(v._1), v._2))).collect() When I execute collectAsMap, where is data? in each Executor?? I guess than each executor has data that it proccesed. The result is sent to the driver, but I guess that each executor keeps its piece of processed data. I guess that it's more efficient that to use a join in this case because there's not shuffle but If I save usersMap as a broadcast variable, wouldn't it be less efficient because I'm sending data to executors and don't need it? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: CollectAsMap, Broadcasting.
No, it exists only on the driver, not the executors. Executors don't retain partitions unless they are supposed to be persisted. Generally, broadcasting a small Map to accomplish a join 'manually' is more efficient than a join, but you are right that this is mostly because joins usually involve shuffles. If not, it's not as clear which way is best. I suppose that if the Map is large-ish, it's safer to not keep pulling it to the driver. On Thu, Feb 26, 2015 at 10:00 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I have a question, If I execute this code, val users = sc.textFile(/tmp/users.log).map(x = x.split(,)).map( v = (v(0), v(1))) val contacts = sc.textFile(/tmp/contacts.log).map(y = y.split(,)).map( v = (v(0), v(1))) val usersMap = contacts.collectAsMap() contacts.map(v = (v._1, (usersMap(v._1), v._2))).collect() When I execute collectAsMap, where is data? in each Executor?? I guess than each executor has data that it proccesed. The result is sent to the driver, but I guess that each executor keeps its piece of processed data. I guess that it's more efficient that to use a join in this case because there's not shuffle but If I save usersMap as a broadcast variable, wouldn't it be less efficient because I'm sending data to executors and don't need it? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: CollectAsMap, Broadcasting.
So, on my example, when I execute: val usersMap = contacts.collectAsMap() -- Map goes to the driver and just lives there in the beginning. contacts.map(v = (v._1, (usersMap(v._1), v._2))).collect When I execute usersMap(v._1), Does driver has to send to the executorX the value which it needs? I guess I'm missing something. How does the data transfer among usersMap(just in the driver) and executors work? On this case it looks like better to use broadcasting like: val usersMap = contacts.collectAsMap() val bc = sc.broadcast(usersMap) contacts.map(v = (v._1, (bc.value(v._1), v._2))).collect() 2015-02-26 11:16 GMT+01:00 Sean Owen so...@cloudera.com: No, it exists only on the driver, not the executors. Executors don't retain partitions unless they are supposed to be persisted. Generally, broadcasting a small Map to accomplish a join 'manually' is more efficient than a join, but you are right that this is mostly because joins usually involve shuffles. If not, it's not as clear which way is best. I suppose that if the Map is large-ish, it's safer to not keep pulling it to the driver. On Thu, Feb 26, 2015 at 10:00 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I have a question, If I execute this code, val users = sc.textFile(/tmp/users.log).map(x = x.split(,)).map( v = (v(0), v(1))) val contacts = sc.textFile(/tmp/contacts.log).map(y = y.split(,)).map( v = (v(0), v(1))) val usersMap = contacts.collectAsMap() contacts.map(v = (v._1, (usersMap(v._1), v._2))).collect() When I execute collectAsMap, where is data? in each Executor?? I guess than each executor has data that it proccesed. The result is sent to the driver, but I guess that each executor keeps its piece of processed data. I guess that it's more efficient that to use a join in this case because there's not shuffle but If I save usersMap as a broadcast variable, wouldn't it be less efficient because I'm sending data to executors and don't need it? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: CollectAsMap, Broadcasting.
Isn't it contacts.map(v = (v._1, (usersMap(v._1), v._2))).collect() executed in the executors? why is it executed in the driver? contacts are not a local object, right? 2015-02-26 11:27 GMT+01:00 Sean Owen so...@cloudera.com: No. That code is just Scala code executing on the driver. usersMap is a local object. This bit has nothing to do with Spark. Yes you would have to broadcast it to use it efficient in functions (not on the driver). On Thu, Feb 26, 2015 at 10:24 AM, Guillermo Ortiz konstt2...@gmail.com wrote: So, on my example, when I execute: val usersMap = contacts.collectAsMap() -- Map goes to the driver and just lives there in the beginning. contacts.map(v = (v._1, (usersMap(v._1), v._2))).collect When I execute usersMap(v._1), Does driver has to send to the executorX the value which it needs? I guess I'm missing something. How does the data transfer among usersMap(just in the driver) and executors work? On this case it looks like better to use broadcasting like: val usersMap = contacts.collectAsMap() val bc = sc.broadcast(usersMap) contacts.map(v = (v._1, (bc.value(v._1), v._2))).collect() 2015-02-26 11:16 GMT+01:00 Sean Owen so...@cloudera.com: No, it exists only on the driver, not the executors. Executors don't retain partitions unless they are supposed to be persisted. Generally, broadcasting a small Map to accomplish a join 'manually' is more efficient than a join, but you are right that this is mostly because joins usually involve shuffles. If not, it's not as clear which way is best. I suppose that if the Map is large-ish, it's safer to not keep pulling it to the driver. On Thu, Feb 26, 2015 at 10:00 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I have a question, If I execute this code, val users = sc.textFile(/tmp/users.log).map(x = x.split(,)).map( v = (v(0), v(1))) val contacts = sc.textFile(/tmp/contacts.log).map(y = y.split(,)).map( v = (v(0), v(1))) val usersMap = contacts.collectAsMap() contacts.map(v = (v._1, (usersMap(v._1), v._2))).collect() When I execute collectAsMap, where is data? in each Executor?? I guess than each executor has data that it proccesed. The result is sent to the driver, but I guess that each executor keeps its piece of processed data. I guess that it's more efficient that to use a join in this case because there's not shuffle but If I save usersMap as a broadcast variable, wouldn't it be less efficient because I'm sending data to executors and don't need it? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: CollectAsMap, Broadcasting.
Yes, in that code, usersMap has been serialized to every executor. I thought you were referring to accessing the copy in the driver. On Thu, Feb 26, 2015 at 10:47 AM, Guillermo Ortiz konstt2...@gmail.com wrote: Isn't it contacts.map(v = (v._1, (usersMap(v._1), v._2))).collect() executed in the executors? why is it executed in the driver? contacts are not a local object, right? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: CollectAsMap, Broadcasting.
One last time to be sure I got it right, the executing sequence here goes like this?: val usersMap = contacts.collectAsMap() #The contacts RDD is collected by the executors and sent to the driver, the executors delete the rdd contacts.map(v = (v._1, (usersMap(v._1), v._2))).collect() #The userMap object is sent again to the executors to run the code, and with the collect(), the result is sent again back to the driver 2015-02-26 11:57 GMT+01:00 Sean Owen so...@cloudera.com: Yes, in that code, usersMap has been serialized to every executor. I thought you were referring to accessing the copy in the driver. On Thu, Feb 26, 2015 at 10:47 AM, Guillermo Ortiz konstt2...@gmail.com wrote: Isn't it contacts.map(v = (v._1, (usersMap(v._1), v._2))).collect() executed in the executors? why is it executed in the driver? contacts are not a local object, right? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: CollectAsMap, Broadcasting.
Yes, but there is no concept of executors 'deleting' an RDD. And you would want to broadcast the usersMap if you're using it this way. On Thu, Feb 26, 2015 at 11:26 AM, Guillermo Ortiz konstt2...@gmail.com wrote: One last time to be sure I got it right, the executing sequence here goes like this?: val usersMap = contacts.collectAsMap() #The contacts RDD is collected by the executors and sent to the driver, the executors delete the rdd contacts.map(v = (v._1, (usersMap(v._1), v._2))).collect() #The userMap object is sent again to the executors to run the code, and with the collect(), the result is sent again back to the driver 2015-02-26 11:57 GMT+01:00 Sean Owen so...@cloudera.com: Yes, in that code, usersMap has been serialized to every executor. I thought you were referring to accessing the copy in the driver. On Thu, Feb 26, 2015 at 10:47 AM, Guillermo Ortiz konstt2...@gmail.com wrote: Isn't it contacts.map(v = (v._1, (usersMap(v._1), v._2))).collect() executed in the executors? why is it executed in the driver? contacts are not a local object, right? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org