Re: Is this a reasonable use of core.async?
I personally think Thomas' is best if load may vary as it is more predictable and straightforward to understand. If we're talking about line code, here's a shortened version that I don't feel sacrifices readability (typed on a phone so please excuse typos...): (let [exec (Executors newFixedThreadPool 4) results (- widgets (mapv (fn [it] (.submit exec #(long-running-widget-processor it (mapv #(.get %)))] (.shutdown exec) results) I think the pure futures version kinda like what Gary suggested is best though because it's straightforward, to the point, idiomatic and you eliminate the need for core.async. Having said that, if you are already using core.async elsewhere in your project then I don't see anything particularly wrong with your way. On 18 Sep 2014 06:41, Beau Fabry imf...@gmail.com wrote: Larry your solution includes the cognitive overhead of another entire library and process model. future is part of core, and as I realised when Gary posted the doall's were unnecessary anyway. On Thursday, September 18, 2014 3:26:36 PM UTC+10, larry google groups wrote: Thanks for that Larry but I think this is a bit of overkill for my scenario. If I'm counting correctly, your original example has 10 lines of code, and my example has 11 lines of code (minus the try/catch and the closure and the namespace declaration). So these 2 solutions are the same length. These are the 11 lines of code: (def ^:private persistence-channel (lamina/channel)) (defn persist-this-item [context-wrapper-for-database-call] (lamina/enqueue persistence-channel (fn [] (persistence/make-consistent context-wrapper-for-database-call (defn worker [] (loop [closure-with-item-inside @(lamina/read-channel persistence-channel)] (closure-with-item-inside) (recur @(lamina/read-channel persistence-channel (defn start-workers [] (dotimes [_ 6] (future (worker Maybe its just that I'm familiar with the code, but these 11 lines of code seem cleaner to me than your 10 lines of code, at least in part because you are doing stuff like calling (doall) which strikes me as a bit suspicious. Do you really feel the 1 extra line of code is overkill? This solution seems to do what you want, and it's the same length as your solution. On Wednesday, September 17, 2014 8:48:09 PM UTC-4, Beau Fabry wrote: Thanks for that Larry but I think this is a bit of overkill for my scenario. The code I pasted is almost verbatim what we have in our production codebase, so the ability to queue new jobs etc is really not needed. Cheers though. On Thursday, September 18, 2014 9:38:47 AM UTC+10, larry google groups wrote: We don't have streams of data here, the long running tasks have side-effects. I would prefer to avoid adding another whole framework just to run a few long running jobs in p//. I guess I should show you some code, so you can see how simple this is. I'll copy-and-paste some code that I use. One simple way I use Lamina is to save stuff to a database. I don't want the save action happening in my main thread, so I put the data in a channel, and I let some workers pull that data off the channel and put it in the database. So what follows is the whole file, this about 30 lines of code, including some try/catch stuff that you probably don't need: (ns loupi.persistence-queue (:require [loupi.persistence :as persistence] [slingshot.slingshot :as ss] [lamina.core :as lamina])) (def ^:private persistence-channel (lamina/channel)) (defn persist-this-item [context-wrapper-for-database-call] (lamina/enqueue persistence-channel (fn [] (ss/try+ (persistence/make-consistent context-wrapper-for-database-call) (catch Object o (ss/throw+ {:type :loupi.supervisor/problem :message Error in persistence-queue/persist-this-itme. :data o})) (defn worker [] (loop [closure-with-item-inside @(lamina/read-channel persistence-channel)] (ss/try+ (closure-with-item-inside) (catch Object o (ss/throw+ {:type :loupi.supervisor/problem :message Error in persistence-queue/worker. :closure closure-with-item-inside :data o}))) (recur @(lamina/read-channel persistence-channel (defn start-workers [] (dotimes [_ 6] (println Starting up the persist queue workers.) (future (worker I call (start-workers) when the app starts. When I save something to the database, I call (persist-this-item) and I put a closure on the channel. The workers eventually grab that closure and execute it. Clearly, that closure can do whatever you like. To borrow from your original
Re: Is this a reasonable use of core.async?
Yeah Dan Thomas's code is an awesome example for anyone looking to solve a similar solution with variable load. In my case this is a predictable batch process and I know the load exactly :-) I ended up implementing the futures approach for the reasons you listed, as it was our only use of core.async and now it's gone from our dependencies :-) On Thursday, September 18, 2014 6:11:24 PM UTC+10, Dan Kersten wrote: I personally think Thomas' is best if load may vary as it is more predictable and straightforward to understand. If we're talking about line code, here's a shortened version that I don't feel sacrifices readability (typed on a phone so please excuse typos...): (let [exec (Executors newFixedThreadPool 4) results (- widgets (mapv (fn [it] (.submit exec #(long-running-widget-processor it (mapv #(.get %)))] (.shutdown exec) results) I think the pure futures version kinda like what Gary suggested is best though because it's straightforward, to the point, idiomatic and you eliminate the need for core.async. Having said that, if you are already using core.async elsewhere in your project then I don't see anything particularly wrong with your way. On 18 Sep 2014 06:41, Beau Fabry imf...@gmail.com javascript: wrote: Larry your solution includes the cognitive overhead of another entire library and process model. future is part of core, and as I realised when Gary posted the doall's were unnecessary anyway. On Thursday, September 18, 2014 3:26:36 PM UTC+10, larry google groups wrote: Thanks for that Larry but I think this is a bit of overkill for my scenario. If I'm counting correctly, your original example has 10 lines of code, and my example has 11 lines of code (minus the try/catch and the closure and the namespace declaration). So these 2 solutions are the same length. These are the 11 lines of code: (def ^:private persistence-channel (lamina/channel)) (defn persist-this-item [context-wrapper-for-database-call] (lamina/enqueue persistence-channel (fn [] (persistence/make-consistent context-wrapper-for-database-call (defn worker [] (loop [closure-with-item-inside @(lamina/read-channel persistence-channel)] (closure-with-item-inside) (recur @(lamina/read-channel persistence-channel (defn start-workers [] (dotimes [_ 6] (future (worker Maybe its just that I'm familiar with the code, but these 11 lines of code seem cleaner to me than your 10 lines of code, at least in part because you are doing stuff like calling (doall) which strikes me as a bit suspicious. Do you really feel the 1 extra line of code is overkill? This solution seems to do what you want, and it's the same length as your solution. On Wednesday, September 17, 2014 8:48:09 PM UTC-4, Beau Fabry wrote: Thanks for that Larry but I think this is a bit of overkill for my scenario. The code I pasted is almost verbatim what we have in our production codebase, so the ability to queue new jobs etc is really not needed. Cheers though. On Thursday, September 18, 2014 9:38:47 AM UTC+10, larry google groups wrote: We don't have streams of data here, the long running tasks have side-effects. I would prefer to avoid adding another whole framework just to run a few long running jobs in p//. I guess I should show you some code, so you can see how simple this is. I'll copy-and-paste some code that I use. One simple way I use Lamina is to save stuff to a database. I don't want the save action happening in my main thread, so I put the data in a channel, and I let some workers pull that data off the channel and put it in the database. So what follows is the whole file, this about 30 lines of code, including some try/catch stuff that you probably don't need: (ns loupi.persistence-queue (:require [loupi.persistence :as persistence] [slingshot.slingshot :as ss] [lamina.core :as lamina])) (def ^:private persistence-channel (lamina/channel)) (defn persist-this-item [context-wrapper-for-database-call] (lamina/enqueue persistence-channel (fn [] (ss/try+ (persistence/make-consistent context-wrapper-for-database-call) (catch Object o (ss/throw+ {:type :loupi.supervisor/problem :message Error in persistence-queue/persist-this-itme. :data o})) (defn worker [] (loop [closure-with-item-inside @(lamina/read-channel persistence-channel)] (ss/try+ (closure-with-item-inside) (catch Object o (ss/throw+ {:type :loupi.supervisor/problem :message Error in persistence-queue/worker. :closure closure-with-item-inside
Re: Is this a reasonable use of core.async?
Sounds like a job for a future. Something like: (- job-list (partition-in-sublists 4) (map #(future (do-job-on-sublist %))) (mapv deref)) This is untested and written on a phone, so might not even be syntactically correct, but the future calls will create new threds to execute the do-job functions on the sublists, and the deref call on a future is blocking. If the result from the futures is not important, I guess you could save some memory by using map and doall instead of mapv, though I doubt it would make any difference if you have only 4 sublists. Alternatively, have you looked at pmap ? On Wednesday, 17 September 2014, Beau Fabry imf...@gmail.com wrote: We don't have streams of data here, the long running tasks have side-effects. I would prefer to avoid adding another whole framework just to run a few long running jobs in p//. I have a list of jobs to do, I'm partitioning that list up into 4 sub lists to be worked through by 4 p// workers, I then want to block and wait until all 4 workers have finished their tasks. On Wednesday, September 17, 2014 3:27:07 AM UTC+10, larry google groups wrote: This does not look correct to me. Perhaps someone else has more insight into this. I am suspicious about 2 things: 1.) your use of doall 2.) your use of (thread) It looks to me like you are trying to hack together a kind of pipeline or channel. Clojure has a wealth of libraries that can handle that for you. The main thing you are trying to do is this: (long-running-widget-processor widget)) You go to some trouble to set up workers, all to ensure that long-running-widget-processor is handled in its own thread. I would suggest you look at Lamina: https://github.com/ztellman/lamina In particular, look at pipelines: https://github.com/ztellman/lamina/wiki/Pipelines On Friday, September 5, 2014 1:46:02 AM UTC-4, Beau Fabry wrote: Is the kinda ugly constant (doall usage a sign that I'm doing something silly? (let [num-workers 4 widgets-per-worker (inc (int (/ (count widgets) num-workers))) bucketed-widgets (partition-all widgets-per-worker widgets) workers (doall (map (fn [widgets] (thread (doseq [widget widgets] (long-running-widget-processor widget)) true)) bucketed-widgets))] (doall (map !! workers))) https://gist.github.com/bfabry/ad830b1888e4fc550f88 All comments appreciated :-) Cheers, Beau -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com javascript:_e(%7B%7D,'cvml','clojure@googlegroups.com'); Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com javascript:_e(%7B%7D,'cvml','clojure%2bunsubscr...@googlegroups.com'); For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com javascript:_e(%7B%7D,'cvml','clojure%2bunsubscr...@googlegroups.com');. For more options, visit https://groups.google.com/d/optout. -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.
Re: Is this a reasonable use of core.async?
pmap isn't an option as the processes kicked off could affect other systems load if we can't control the level of parallelization. futures seem like they'd work quite well (the return value of the jobs is nil, it's a doseq). I might rewrite it with futures at some point. Although it really just seems like a slightly cleaner way of doing exactly what I did with core.async :-) Thanks Gary! On Wednesday, September 17, 2014 5:31:34 PM UTC+10, Gary Verhaegen wrote: Sounds like a job for a future. Something like: (- job-list (partition-in-sublists 4) (map #(future (do-job-on-sublist %))) (mapv deref)) This is untested and written on a phone, so might not even be syntactically correct, but the future calls will create new threds to execute the do-job functions on the sublists, and the deref call on a future is blocking. If the result from the futures is not important, I guess you could save some memory by using map and doall instead of mapv, though I doubt it would make any difference if you have only 4 sublists. Alternatively, have you looked at pmap ? On Wednesday, 17 September 2014, Beau Fabry imf...@gmail.com javascript: wrote: We don't have streams of data here, the long running tasks have side-effects. I would prefer to avoid adding another whole framework just to run a few long running jobs in p//. I have a list of jobs to do, I'm partitioning that list up into 4 sub lists to be worked through by 4 p// workers, I then want to block and wait until all 4 workers have finished their tasks. On Wednesday, September 17, 2014 3:27:07 AM UTC+10, larry google groups wrote: This does not look correct to me. Perhaps someone else has more insight into this. I am suspicious about 2 things: 1.) your use of doall 2.) your use of (thread) It looks to me like you are trying to hack together a kind of pipeline or channel. Clojure has a wealth of libraries that can handle that for you. The main thing you are trying to do is this: (long-running-widget-processor widget)) You go to some trouble to set up workers, all to ensure that long-running-widget-processor is handled in its own thread. I would suggest you look at Lamina: https://github.com/ztellman/lamina In particular, look at pipelines: https://github.com/ztellman/lamina/wiki/Pipelines On Friday, September 5, 2014 1:46:02 AM UTC-4, Beau Fabry wrote: Is the kinda ugly constant (doall usage a sign that I'm doing something silly? (let [num-workers 4 widgets-per-worker (inc (int (/ (count widgets) num-workers))) bucketed-widgets (partition-all widgets-per-worker widgets) workers (doall (map (fn [widgets] (thread (doseq [widget widgets] (long-running-widget-processor widget)) true)) bucketed-widgets))] (doall (map !! workers))) https://gist.github.com/bfabry/ad830b1888e4fc550f88 All comments appreciated :-) Cheers, Beau -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout. -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.
Re: Is this a reasonable use of core.async?
core.sync is more about coordination and communication than about doing things in parallel, thats just a pleasant side effect. In your case you don't need anything core.async provides. You could either use reducers or java.util.concurrent.ExecutorService. (let [exec (Executors/newFixedThreadPool 4) process (fn [it] (.submit exec #(long-running-widget-processor it))) get (fn [future] (.get future))] (- widgets (mapv process) (mapv get))) This skips the partitioning of the inputs and lets the Threadpool deal with spreading the work. This probably is optimal to ensure that all 4 threads are busy until all work is done. (Note: I did not run this code) Reducers would be good if the number of items processed is large enough and you are doing pure number crunching with no I/O. Otherwise the fork/join probably costs too much. HTH, /thomas On Friday, September 5, 2014 7:46:02 AM UTC+2, Beau Fabry wrote: Is the kinda ugly constant (doall usage a sign that I'm doing something silly? (let [num-workers 4 widgets-per-worker (inc (int (/ (count widgets) num-workers))) bucketed-widgets (partition-all widgets-per-worker widgets) workers (doall (map (fn [widgets] (thread (doseq [widget widgets] (long-running-widget-processor widget)) true)) bucketed-widgets))] (doall (map !! workers))) https://gist.github.com/bfabry/ad830b1888e4fc550f88 All comments appreciated :-) Cheers, Beau -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.
Re: Is this a reasonable use of core.async?
Uh forgot to .shutdown the exec, proper cleanup is important otherwise the threads might hang arround a bit. On Wednesday, September 17, 2014 1:26:00 PM UTC+2, Thomas Heller wrote: core.sync is more about coordination and communication than about doing things in parallel, thats just a pleasant side effect. In your case you don't need anything core.async provides. You could either use reducers or java.util.concurrent.ExecutorService. (let [exec (Executors/newFixedThreadPool 4) process (fn [it] (.submit exec #(long-running-widget-processor it))) get (fn [future] (.get future))] (- widgets (mapv process) (mapv get))) This skips the partitioning of the inputs and lets the Threadpool deal with spreading the work. This probably is optimal to ensure that all 4 threads are busy until all work is done. (Note: I did not run this code) Reducers would be good if the number of items processed is large enough and you are doing pure number crunching with no I/O. Otherwise the fork/join probably costs too much. HTH, /thomas On Friday, September 5, 2014 7:46:02 AM UTC+2, Beau Fabry wrote: Is the kinda ugly constant (doall usage a sign that I'm doing something silly? (let [num-workers 4 widgets-per-worker (inc (int (/ (count widgets) num-workers))) bucketed-widgets (partition-all widgets-per-worker widgets) workers (doall (map (fn [widgets] (thread (doseq [widget widgets] (long-running-widget-processor widget)) true)) bucketed-widgets))] (doall (map !! workers))) https://gist.github.com/bfabry/ad830b1888e4fc550f88 All comments appreciated :-) Cheers, Beau -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.
Re: Is this a reasonable use of core.async?
We don't have streams of data here, the long running tasks have side-effects. I would prefer to avoid adding another whole framework just to run a few long running jobs in p//. I guess I should show you some code, so you can see how simple this is. I'll copy-and-paste some code that I use. One simple way I use Lamina is to save stuff to a database. I don't want the save action happening in my main thread, so I put the data in a channel, and I let some workers pull that data off the channel and put it in the database. So what follows is the whole file, this about 30 lines of code, including some try/catch stuff that you probably don't need: (ns loupi.persistence-queue (:require [loupi.persistence :as persistence] [slingshot.slingshot :as ss] [lamina.core :as lamina])) (def ^:private persistence-channel (lamina/channel)) (defn persist-this-item [context-wrapper-for-database-call] (lamina/enqueue persistence-channel (fn [] (ss/try+ (persistence/make-consistent context-wrapper-for-database-call) (catch Object o (ss/throw+ {:type :loupi.supervisor/problem :message Error in persistence-queue/persist-this-itme. :data o})) (defn worker [] (loop [closure-with-item-inside @(lamina/read-channel persistence-channel)] (ss/try+ (closure-with-item-inside) (catch Object o (ss/throw+ {:type :loupi.supervisor/problem :message Error in persistence-queue/worker. :closure closure-with-item-inside :data o}))) (recur @(lamina/read-channel persistence-channel (defn start-workers [] (dotimes [_ 6] (println Starting up the persist queue workers.) (future (worker I call (start-workers) when the app starts. When I save something to the database, I call (persist-this-item) and I put a closure on the channel. The workers eventually grab that closure and execute it. Clearly, that closure can do whatever you like. To borrow from your original example, that closure is where you would put: (long-running-widget-processor widget) On Tuesday, September 16, 2014 10:00:07 PM UTC-4, Beau Fabry wrote: We don't have streams of data here, the long running tasks have side-effects. I would prefer to avoid adding another whole framework just to run a few long running jobs in p//. I have a list of jobs to do, I'm partitioning that list up into 4 sub lists to be worked through by 4 p// workers, I then want to block and wait until all 4 workers have finished their tasks. On Wednesday, September 17, 2014 3:27:07 AM UTC+10, larry google groups wrote: This does not look correct to me. Perhaps someone else has more insight into this. I am suspicious about 2 things: 1.) your use of doall 2.) your use of (thread) It looks to me like you are trying to hack together a kind of pipeline or channel. Clojure has a wealth of libraries that can handle that for you. The main thing you are trying to do is this: (long-running-widget-processor widget)) You go to some trouble to set up workers, all to ensure that long-running-widget-processor is handled in its own thread. I would suggest you look at Lamina: https://github.com/ztellman/lamina In particular, look at pipelines: https://github.com/ztellman/lamina/wiki/Pipelines On Friday, September 5, 2014 1:46:02 AM UTC-4, Beau Fabry wrote: Is the kinda ugly constant (doall usage a sign that I'm doing something silly? (let [num-workers 4 widgets-per-worker (inc (int (/ (count widgets) num-workers))) bucketed-widgets (partition-all widgets-per-worker widgets) workers (doall (map (fn [widgets] (thread (doseq [widget widgets] (long-running-widget-processor widget)) true)) bucketed-widgets))] (doall (map !! workers))) https://gist.github.com/bfabry/ad830b1888e4fc550f88 All comments appreciated :-) Cheers, Beau -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.
Re: Is this a reasonable use of core.async?
Thanks for that Larry but I think this is a bit of overkill for my scenario. The code I pasted is almost verbatim what we have in our production codebase, so the ability to queue new jobs etc is really not needed. Cheers though. On Thursday, September 18, 2014 9:38:47 AM UTC+10, larry google groups wrote: We don't have streams of data here, the long running tasks have side-effects. I would prefer to avoid adding another whole framework just to run a few long running jobs in p//. I guess I should show you some code, so you can see how simple this is. I'll copy-and-paste some code that I use. One simple way I use Lamina is to save stuff to a database. I don't want the save action happening in my main thread, so I put the data in a channel, and I let some workers pull that data off the channel and put it in the database. So what follows is the whole file, this about 30 lines of code, including some try/catch stuff that you probably don't need: (ns loupi.persistence-queue (:require [loupi.persistence :as persistence] [slingshot.slingshot :as ss] [lamina.core :as lamina])) (def ^:private persistence-channel (lamina/channel)) (defn persist-this-item [context-wrapper-for-database-call] (lamina/enqueue persistence-channel (fn [] (ss/try+ (persistence/make-consistent context-wrapper-for-database-call) (catch Object o (ss/throw+ {:type :loupi.supervisor/problem :message Error in persistence-queue/persist-this-itme. :data o})) (defn worker [] (loop [closure-with-item-inside @(lamina/read-channel persistence-channel)] (ss/try+ (closure-with-item-inside) (catch Object o (ss/throw+ {:type :loupi.supervisor/problem :message Error in persistence-queue/worker. :closure closure-with-item-inside :data o}))) (recur @(lamina/read-channel persistence-channel (defn start-workers [] (dotimes [_ 6] (println Starting up the persist queue workers.) (future (worker I call (start-workers) when the app starts. When I save something to the database, I call (persist-this-item) and I put a closure on the channel. The workers eventually grab that closure and execute it. Clearly, that closure can do whatever you like. To borrow from your original example, that closure is where you would put: (long-running-widget-processor widget) On Tuesday, September 16, 2014 10:00:07 PM UTC-4, Beau Fabry wrote: We don't have streams of data here, the long running tasks have side-effects. I would prefer to avoid adding another whole framework just to run a few long running jobs in p//. I have a list of jobs to do, I'm partitioning that list up into 4 sub lists to be worked through by 4 p// workers, I then want to block and wait until all 4 workers have finished their tasks. On Wednesday, September 17, 2014 3:27:07 AM UTC+10, larry google groups wrote: This does not look correct to me. Perhaps someone else has more insight into this. I am suspicious about 2 things: 1.) your use of doall 2.) your use of (thread) It looks to me like you are trying to hack together a kind of pipeline or channel. Clojure has a wealth of libraries that can handle that for you. The main thing you are trying to do is this: (long-running-widget-processor widget)) You go to some trouble to set up workers, all to ensure that long-running-widget-processor is handled in its own thread. I would suggest you look at Lamina: https://github.com/ztellman/lamina In particular, look at pipelines: https://github.com/ztellman/lamina/wiki/Pipelines On Friday, September 5, 2014 1:46:02 AM UTC-4, Beau Fabry wrote: Is the kinda ugly constant (doall usage a sign that I'm doing something silly? (let [num-workers 4 widgets-per-worker (inc (int (/ (count widgets) num-workers))) bucketed-widgets (partition-all widgets-per-worker widgets) workers (doall (map (fn [widgets] (thread (doseq [widget widgets] (long-running-widget-processor widget)) true)) bucketed-widgets))] (doall (map !! workers))) https://gist.github.com/bfabry/ad830b1888e4fc550f88 All comments appreciated :-) Cheers, Beau -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more
Re: Is this a reasonable use of core.async?
Sounds like a job for a future. If he knows the workload, and he knows how many threads will be spun up, and he knows that the number will be reasonably small, then (future) is a good bet. But if there is any risk of large numbers of threads being spun up, then he should avoid calling (future) and he should instead use a library that takes on the work of managing a thread pool for him. He can call (future) if he knows there are only going to be 8 threads, or a 100, but if there might be 10,000 threads spun up, then he should probably not call (future) directly. On Wednesday, September 17, 2014 3:31:34 AM UTC-4, Gary Verhaegen wrote: Sounds like a job for a future. Something like: (- job-list (partition-in-sublists 4) (map #(future (do-job-on-sublist %))) (mapv deref)) This is untested and written on a phone, so might not even be syntactically correct, but the future calls will create new threds to execute the do-job functions on the sublists, and the deref call on a future is blocking. If the result from the futures is not important, I guess you could save some memory by using map and doall instead of mapv, though I doubt it would make any difference if you have only 4 sublists. Alternatively, have you looked at pmap ? On Wednesday, 17 September 2014, Beau Fabry imf...@gmail.com javascript: wrote: We don't have streams of data here, the long running tasks have side-effects. I would prefer to avoid adding another whole framework just to run a few long running jobs in p//. I have a list of jobs to do, I'm partitioning that list up into 4 sub lists to be worked through by 4 p// workers, I then want to block and wait until all 4 workers have finished their tasks. On Wednesday, September 17, 2014 3:27:07 AM UTC+10, larry google groups wrote: This does not look correct to me. Perhaps someone else has more insight into this. I am suspicious about 2 things: 1.) your use of doall 2.) your use of (thread) It looks to me like you are trying to hack together a kind of pipeline or channel. Clojure has a wealth of libraries that can handle that for you. The main thing you are trying to do is this: (long-running-widget-processor widget)) You go to some trouble to set up workers, all to ensure that long-running-widget-processor is handled in its own thread. I would suggest you look at Lamina: https://github.com/ztellman/lamina In particular, look at pipelines: https://github.com/ztellman/lamina/wiki/Pipelines On Friday, September 5, 2014 1:46:02 AM UTC-4, Beau Fabry wrote: Is the kinda ugly constant (doall usage a sign that I'm doing something silly? (let [num-workers 4 widgets-per-worker (inc (int (/ (count widgets) num-workers))) bucketed-widgets (partition-all widgets-per-worker widgets) workers (doall (map (fn [widgets] (thread (doseq [widget widgets] (long-running-widget-processor widget)) true)) bucketed-widgets))] (doall (map !! workers))) https://gist.github.com/bfabry/ad830b1888e4fc550f88 All comments appreciated :-) Cheers, Beau -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout. -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.
Re: Is this a reasonable use of core.async?
Thanks for that Larry but I think this is a bit of overkill for my scenario. If I'm counting correctly, your original example has 10 lines of code, and my example has 11 lines of code (minus the try/catch and the closure and the namespace declaration). So these 2 solutions are the same length. These are the 11 lines of code: (def ^:private persistence-channel (lamina/channel)) (defn persist-this-item [context-wrapper-for-database-call] (lamina/enqueue persistence-channel (fn [] (persistence/make-consistent context-wrapper-for-database-call (defn worker [] (loop [closure-with-item-inside @(lamina/read-channel persistence-channel)] (closure-with-item-inside) (recur @(lamina/read-channel persistence-channel (defn start-workers [] (dotimes [_ 6] (future (worker Maybe its just that I'm familiar with the code, but these 11 lines of code seem cleaner to me than your 10 lines of code, at least in part because you are doing stuff like calling (doall) which strikes me as a bit suspicious. Do you really feel the 1 extra line of code is overkill? This solution seems to do what you want, and it's the same length as your solution. On Wednesday, September 17, 2014 8:48:09 PM UTC-4, Beau Fabry wrote: Thanks for that Larry but I think this is a bit of overkill for my scenario. The code I pasted is almost verbatim what we have in our production codebase, so the ability to queue new jobs etc is really not needed. Cheers though. On Thursday, September 18, 2014 9:38:47 AM UTC+10, larry google groups wrote: We don't have streams of data here, the long running tasks have side-effects. I would prefer to avoid adding another whole framework just to run a few long running jobs in p//. I guess I should show you some code, so you can see how simple this is. I'll copy-and-paste some code that I use. One simple way I use Lamina is to save stuff to a database. I don't want the save action happening in my main thread, so I put the data in a channel, and I let some workers pull that data off the channel and put it in the database. So what follows is the whole file, this about 30 lines of code, including some try/catch stuff that you probably don't need: (ns loupi.persistence-queue (:require [loupi.persistence :as persistence] [slingshot.slingshot :as ss] [lamina.core :as lamina])) (def ^:private persistence-channel (lamina/channel)) (defn persist-this-item [context-wrapper-for-database-call] (lamina/enqueue persistence-channel (fn [] (ss/try+ (persistence/make-consistent context-wrapper-for-database-call) (catch Object o (ss/throw+ {:type :loupi.supervisor/problem :message Error in persistence-queue/persist-this-itme. :data o})) (defn worker [] (loop [closure-with-item-inside @(lamina/read-channel persistence-channel)] (ss/try+ (closure-with-item-inside) (catch Object o (ss/throw+ {:type :loupi.supervisor/problem :message Error in persistence-queue/worker. :closure closure-with-item-inside :data o}))) (recur @(lamina/read-channel persistence-channel (defn start-workers [] (dotimes [_ 6] (println Starting up the persist queue workers.) (future (worker I call (start-workers) when the app starts. When I save something to the database, I call (persist-this-item) and I put a closure on the channel. The workers eventually grab that closure and execute it. Clearly, that closure can do whatever you like. To borrow from your original example, that closure is where you would put: (long-running-widget-processor widget) On Tuesday, September 16, 2014 10:00:07 PM UTC-4, Beau Fabry wrote: We don't have streams of data here, the long running tasks have side-effects. I would prefer to avoid adding another whole framework just to run a few long running jobs in p//. I have a list of jobs to do, I'm partitioning that list up into 4 sub lists to be worked through by 4 p// workers, I then want to block and wait until all 4 workers have finished their tasks. On Wednesday, September 17, 2014 3:27:07 AM UTC+10, larry google groups wrote: This does not look correct to me. Perhaps someone else has more insight into this. I am suspicious about 2 things: 1.) your use of doall 2.) your use of (thread) It looks to me like you are trying to hack together a kind of pipeline or channel. Clojure has a wealth of libraries that can handle that for you. The main thing you are trying to do is this: (long-running-widget-processor widget)) You go to some trouble to set up workers, all to ensure that
Re: Is this a reasonable use of core.async?
Larry your solution includes the cognitive overhead of another entire library and process model. future is part of core, and as I realised when Gary posted the doall's were unnecessary anyway. On Thursday, September 18, 2014 3:26:36 PM UTC+10, larry google groups wrote: Thanks for that Larry but I think this is a bit of overkill for my scenario. If I'm counting correctly, your original example has 10 lines of code, and my example has 11 lines of code (minus the try/catch and the closure and the namespace declaration). So these 2 solutions are the same length. These are the 11 lines of code: (def ^:private persistence-channel (lamina/channel)) (defn persist-this-item [context-wrapper-for-database-call] (lamina/enqueue persistence-channel (fn [] (persistence/make-consistent context-wrapper-for-database-call (defn worker [] (loop [closure-with-item-inside @(lamina/read-channel persistence-channel)] (closure-with-item-inside) (recur @(lamina/read-channel persistence-channel (defn start-workers [] (dotimes [_ 6] (future (worker Maybe its just that I'm familiar with the code, but these 11 lines of code seem cleaner to me than your 10 lines of code, at least in part because you are doing stuff like calling (doall) which strikes me as a bit suspicious. Do you really feel the 1 extra line of code is overkill? This solution seems to do what you want, and it's the same length as your solution. On Wednesday, September 17, 2014 8:48:09 PM UTC-4, Beau Fabry wrote: Thanks for that Larry but I think this is a bit of overkill for my scenario. The code I pasted is almost verbatim what we have in our production codebase, so the ability to queue new jobs etc is really not needed. Cheers though. On Thursday, September 18, 2014 9:38:47 AM UTC+10, larry google groups wrote: We don't have streams of data here, the long running tasks have side-effects. I would prefer to avoid adding another whole framework just to run a few long running jobs in p//. I guess I should show you some code, so you can see how simple this is. I'll copy-and-paste some code that I use. One simple way I use Lamina is to save stuff to a database. I don't want the save action happening in my main thread, so I put the data in a channel, and I let some workers pull that data off the channel and put it in the database. So what follows is the whole file, this about 30 lines of code, including some try/catch stuff that you probably don't need: (ns loupi.persistence-queue (:require [loupi.persistence :as persistence] [slingshot.slingshot :as ss] [lamina.core :as lamina])) (def ^:private persistence-channel (lamina/channel)) (defn persist-this-item [context-wrapper-for-database-call] (lamina/enqueue persistence-channel (fn [] (ss/try+ (persistence/make-consistent context-wrapper-for-database-call) (catch Object o (ss/throw+ {:type :loupi.supervisor/problem :message Error in persistence-queue/persist-this-itme. :data o})) (defn worker [] (loop [closure-with-item-inside @(lamina/read-channel persistence-channel)] (ss/try+ (closure-with-item-inside) (catch Object o (ss/throw+ {:type :loupi.supervisor/problem :message Error in persistence-queue/worker. :closure closure-with-item-inside :data o}))) (recur @(lamina/read-channel persistence-channel (defn start-workers [] (dotimes [_ 6] (println Starting up the persist queue workers.) (future (worker I call (start-workers) when the app starts. When I save something to the database, I call (persist-this-item) and I put a closure on the channel. The workers eventually grab that closure and execute it. Clearly, that closure can do whatever you like. To borrow from your original example, that closure is where you would put: (long-running-widget-processor widget) On Tuesday, September 16, 2014 10:00:07 PM UTC-4, Beau Fabry wrote: We don't have streams of data here, the long running tasks have side-effects. I would prefer to avoid adding another whole framework just to run a few long running jobs in p//. I have a list of jobs to do, I'm partitioning that list up into 4 sub lists to be worked through by 4 p// workers, I then want to block and wait until all 4 workers have finished their tasks. On Wednesday, September 17, 2014 3:27:07 AM UTC+10, larry google groups wrote: This does not look correct to me. Perhaps someone else has more insight into this. I am suspicious about 2 things: 1.) your use of doall 2.) your use of (thread) It looks to me like you
Re: Is this a reasonable use of core.async?
This does not look correct to me. Perhaps someone else has more insight into this. I am suspicious about 2 things: 1.) your use of doall 2.) your use of (thread) It looks to me like you are trying to hack together a kind of pipeline or channel. Clojure has a wealth of libraries that can handle that for you. The main thing you are trying to do is this: (long-running-widget-processor widget)) You go to some trouble to set up workers, all to ensure that long-running-widget-processor is handled in its own thread. I would suggest you look at Lamina: https://github.com/ztellman/lamina In particular, look at pipelines: https://github.com/ztellman/lamina/wiki/Pipelines On Friday, September 5, 2014 1:46:02 AM UTC-4, Beau Fabry wrote: Is the kinda ugly constant (doall usage a sign that I'm doing something silly? (let [num-workers 4 widgets-per-worker (inc (int (/ (count widgets) num-workers))) bucketed-widgets (partition-all widgets-per-worker widgets) workers (doall (map (fn [widgets] (thread (doseq [widget widgets] (long-running-widget-processor widget)) true)) bucketed-widgets))] (doall (map !! workers))) https://gist.github.com/bfabry/ad830b1888e4fc550f88 All comments appreciated :-) Cheers, Beau -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.
Re: Is this a reasonable use of core.async?
We don't have streams of data here, the long running tasks have side-effects. I would prefer to avoid adding another whole framework just to run a few long running jobs in p//. I have a list of jobs to do, I'm partitioning that list up into 4 sub lists to be worked through by 4 p// workers, I then want to block and wait until all 4 workers have finished their tasks. On Wednesday, September 17, 2014 3:27:07 AM UTC+10, larry google groups wrote: This does not look correct to me. Perhaps someone else has more insight into this. I am suspicious about 2 things: 1.) your use of doall 2.) your use of (thread) It looks to me like you are trying to hack together a kind of pipeline or channel. Clojure has a wealth of libraries that can handle that for you. The main thing you are trying to do is this: (long-running-widget-processor widget)) You go to some trouble to set up workers, all to ensure that long-running-widget-processor is handled in its own thread. I would suggest you look at Lamina: https://github.com/ztellman/lamina In particular, look at pipelines: https://github.com/ztellman/lamina/wiki/Pipelines On Friday, September 5, 2014 1:46:02 AM UTC-4, Beau Fabry wrote: Is the kinda ugly constant (doall usage a sign that I'm doing something silly? (let [num-workers 4 widgets-per-worker (inc (int (/ (count widgets) num-workers))) bucketed-widgets (partition-all widgets-per-worker widgets) workers (doall (map (fn [widgets] (thread (doseq [widget widgets] (long-running-widget-processor widget)) true)) bucketed-widgets))] (doall (map !! workers))) https://gist.github.com/bfabry/ad830b1888e4fc550f88 All comments appreciated :-) Cheers, Beau -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.
Is this a reasonable use of core.async?
Is the kinda ugly constant (doall usage a sign that I'm doing something silly? (let [num-workers 4 widgets-per-worker (inc (int (/ (count widgets) num-workers))) bucketed-widgets (partition-all widgets-per-worker widgets) workers (doall (map (fn [widgets] (thread (doseq [widget widgets] (long-running-widget-processor widget)) true)) bucketed-widgets))] (doall (map !! workers))) https://gist.github.com/bfabry/ad830b1888e4fc550f88 All comments appreciated :-) Cheers, Beau -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.