Re: Seeking a function to partially parallelize collection processing

2017-07-14 Thread arthur
I have a few questions if this doesn't solve your issue, but how about 
something as simple as:

(pmap (partial map handler) (partition-by splitter collection))

partition-by is lazy, and pmap is lazy-ish.


On Friday, June 16, 2017 at 10:13:11 AM UTC-4, Tom Connors wrote:
>
> I'm looking for a function that would likely be named something like 
> "sequential-by" or "parallel-per" that takes some data-producing thing like 
> a lazy seq or a core async channel, a function to split records from that 
> input, and a function to handle each item. Each item with an identical 
> return value from the "split" function would be handled sequentially, while 
> the handling of the collection as a whole would be parallel.
>
> If we assume this signature:
> (parallel-per splitter handler inputs)
>
>
> Calling it like this:
> (parallel-per :id
>   (fn [m] (prn (update m :val inc)))
>   [{:id 1 :val 1}, {:id 1 :val 2}, {:id 3 :val 1}])
>
>
> Would result in the first two maps being handled sequentially, while the 
> third map is handled in parallel with the first two. The order of the 
> printed lines would probably be non-deterministic, except {:id 1 :val 2} 
> would be printed before {:id 1 :val 3}.
>
> Note that for my use case I don't care about return values, but if 
> something like this already exists it's plausible that it returns something 
> equivalent to (map handler inputs).
>
> Does anything like this already exist in core or some lib? If not, any 
> recommendations for how to build it?
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Re: Seeking a function to partially parallelize collection processing

2017-06-26 Thread Tom Connors
Thanks for these two examples, Didier. In particular, I like the second one 
a lot. I'm currently using a slightly altered version of my first solution 
that avoids batching, but the code has gotten pretty nasty. Once I get a 
chance I'll cleanup my solution and benchmark yours and mine and post here 
for posterity.

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Re: Seeking a function to partially parallelize collection processing

2017-06-20 Thread Didier
Here:

(ns dda.test)

(def test-infinite-lazy-seq (repeatedly
 (fn [] {:id (rand-int 2)
 :val (rand-int 10)})))

(def test-finite-seq [{:id 1 :val 1}
  {:id 1 :val 2}
  {:id 3 :val 1}])

(defn parallel-per
  [k seqf ls]
  (pmap #(map seqf %) (vals (group-by #(k %) ls

(defn get-next [x]
  "Your code to call Kinesis for the next x item would be here."
  (take x test-infinite-lazy-seq))

(def processed (atom []))
(dotimes [_ 3] ; This would be a doseq instead, or whatever you need it to 
be
  (swap! processed
 #(concat % (parallel-per :id
  (fn [m] (update m :val inc))
  (get-next 20)

(parallel-per :id
  (fn [m] (update m :val inc))
  test-finite-seq)


This is what you would do if you wanted to "chunk" it. You'd just use 
group-by instead of partition-by. The difference is that parallel-per would 
lose the ability to process infinite sequences, as it is now mostly eager, 
because group-by is eager. So you'd have to call it in some loop where each 
time to pass it the next chunk to parallel-per process.


On Tuesday, 20 June 2017 19:28:04 UTC-7, Didier wrote:
>
> Do you want something like this?
>
> (ns dda.test)
>
> (def test-infinite-lazy-seq (repeatedly
>  (fn [] {:id (rand-int 2)
>  :val (rand-int 10)})))
>
> (def test-finite-seq [{:id 1 :val 1}
>   {:id 1 :val 2}
>   {:id 3 :val 1}])
>
> (defn parallel-per
>   [k seqf ls]
>   (pmap #(map seqf %) (partition-by #(k %) ls)))
>
> (take 10 (parallel-per :id
>(fn [m] (update m :val inc))
>test-infinite-lazy-seq))
>
> (parallel-per :id
>   (fn [m] (update m :val inc))
>   test-finite-seq)
>
>
> It handles your simple example, and can also handle infinite sequences 
> lazily, since I assumed your Kinesis stream would be infinite and you want 
> to process things as they come through.
>
> Now this only parallelize groups that come through back to back. It is not 
> possible to do a group by ":id" on an infinite sequence, so the only thing 
> you could do better then this would be to chunk. So you could take in batch 
> of 100 from the stream, then group-by on it, and parallelize each groups. I 
> can try to write a solution for that too if you want.
>
>
> On Tuesday, 20 June 2017 11:57:59 UTC-7, Tom Connors wrote:
>>
>> Great, I'll watch that video. Thanks again.
>>
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Re: Seeking a function to partially parallelize collection processing

2017-06-20 Thread Didier
Do you want something like this?

(ns dda.test)

(def test-infinite-lazy-seq (repeatedly
 (fn [] {:id (rand-int 2)
 :val (rand-int 10)})))

(def test-finite-seq [{:id 1 :val 1}
  {:id 1 :val 2}
  {:id 3 :val 1}])

(defn parallel-per
  [k seqf ls]
  (pmap #(map seqf %) (partition-by #(k %) ls)))

(take 10 (parallel-per :id
   (fn [m] (update m :val inc))
   test-infinite-lazy-seq))

(parallel-per :id
  (fn [m] (update m :val inc))
  test-finite-seq)


It handles your simple example, and can also handle infinite sequences 
lazily, since I assumed your Kinesis stream would be infinite and you want 
to process things as they come through.

Now this only parallelize groups that come through back to back. It is not 
possible to do a group by ":id" on an infinite sequence, so the only thing 
you could do better then this would be to chunk. So you could take in batch 
of 100 from the stream, then group-by on it, and parallelize each groups. I 
can try to write a solution for that too if you want.


On Tuesday, 20 June 2017 11:57:59 UTC-7, Tom Connors wrote:
>
> Great, I'll watch that video. Thanks again.
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Re: Seeking a function to partially parallelize collection processing

2017-06-20 Thread Tom Connors
Great, I'll watch that video. Thanks again.

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Re: Seeking a function to partially parallelize collection processing

2017-06-20 Thread Justin Smith
channel operations are io, and intermixing them with processing leads to
code that is difficult to read and debug. core.async has facilities to help
you code more declaratively over channels. I think TImothy Baldridge's talk
at the last Clojure/West does a great job of presenting the issue
https://www.youtube.com/watch?v=096pIlA3GDo

On Tue, Jun 20, 2017 at 11:44 AM Tom Connors  wrote:

> Thanks, Justin. Regarding the mixing of program logic with channel io, I'm
> don't understand why that's a problem in this case or how it could be
> improved. Do you mind explaining that a bit more?
>
> --
> You received this message because you are subscribed to the Google
> Groups "Clojure" group.
> To post to this group, send email to clojure@googlegroups.com
> Note that posts from new members are moderated - please be patient with
> your first post.
> To unsubscribe from this group, send email to
> clojure+unsubscr...@googlegroups.com
> For more options, visit this group at
> http://groups.google.com/group/clojure?hl=en
> ---
> You received this message because you are subscribed to the Google Groups
> "Clojure" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to clojure+unsubscr...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Re: Seeking a function to partially parallelize collection processing

2017-06-20 Thread Tom Connors
Thanks, Justin. Regarding the mixing of program logic with channel io, I'm 
don't understand why that's a problem in this case or how it could be 
improved. Do you mind explaining that a bit more?

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Re: Seeking a function to partially parallelize collection processing

2017-06-20 Thread Justin Smith
Aside from style issues of mixing channel input/output with program logic,
and hiding the useful return value of go-loop, the real problem here is
doing your work inside a go block. Go blocks are not meant for blocking
tasks, whether CPU or IO bound; doing real work inside go blocks risks
starving all of core.async's implementation threads which prevents all your
go blocks from doing work. Use async/thread to do work in a real thread and
park on the channel it returns.

On Tue, Jun 20, 2017 at 10:15 AM Tom Connors  wrote:

> Thanks for the suggestion, Didier, but I was unable to find a way to make
> pmap work for my use case. For those interested, here's what I came up
> with, then some  questions:
>
> (defn parallel-per
>   "Handle records from input-chan in parallel, but records with matching
> `splitter` return values serially."
>   [splitter handler input-chan]
>   (let [blockers (atom {}) ;; map of group-key to [chan num-remaining]
> status-chan (async/chan)]
> (async/go-loop []
>   (let [[val port] (async/alts! [input-chan status-chan])]
> (if (= port input-chan)
>   (if (some? val)
> (let [group-key (splitter val)]
>   (if-let [blocker (get @blockers group-key)]
> (let [[blocker-chan ^long num-remaining] blocker
>   next-blocker-chan (async/chan)]
>   (swap! blockers assoc group-key [next-blocker-chan (inc
> num-remaining)])
>   (async/go
> (async/ (handler val)
> (async/put! status-chan group-key)
> (async/close! next-blocker-chan))
>   (recur))
> (let [blocker-chan (async/chan)]
>   (swap! blockers assoc group-key [blocker-chan 1])
>   (async/go
> (handler val)
> (async/put! status-chan group-key)
> (async/close! blocker-chan))
>   (recur
> (async/close! status-chan))
>   (let [group-key val
> [_ ^long num-remaining] (get @blockers group-key)]
> (if (> num-remaining 1)
>   (do
> (swap! blockers update-in [group-key 1] dec)
> (recur))
>   (do
> (swap! blockers dissoc group-key)
> (recur)))
> nil))
>
> Does anything in here look bad? Is there a way to gracefully handle
> input-chan closing without using loop/recur? I'm using a new channel for
> every new record to block the next record with the same `splitter` return
> value - my first approach used one channel per distinct `splitter` value,
> but I saw some results printed out of order. Does this mean that the order
> of takes from 
> --
> You received this message because you are subscribed to the Google
> Groups "Clojure" group.
> To post to this group, send email to clojure@googlegroups.com
> Note that posts from new members are moderated - please be patient with
> your first post.
> To unsubscribe from this group, send email to
> clojure+unsubscr...@googlegroups.com
> For more options, visit this group at
> http://groups.google.com/group/clojure?hl=en
> ---
> You received this message because you are subscribed to the Google Groups
> "Clojure" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to clojure+unsubscr...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Re: Seeking a function to partially parallelize collection processing

2017-06-20 Thread Tom Connors
Thanks for the suggestion, Didier, but I was unable to find a way to make 
pmap work for my use case. For those interested, here's what I came up 
with, then some  questions:

(defn parallel-per
  "Handle records from input-chan in parallel, but records with matching 
`splitter` return values serially."
  [splitter handler input-chan]
  (let [blockers (atom {}) ;; map of group-key to [chan num-remaining]
status-chan (async/chan)]
(async/go-loop []
  (let [[val port] (async/alts! [input-chan status-chan])]
(if (= port input-chan)
  (if (some? val)
(let [group-key (splitter val)]
  (if-let [blocker (get @blockers group-key)]
(let [[blocker-chan ^long num-remaining] blocker
  next-blocker-chan (async/chan)]
  (swap! blockers assoc group-key [next-blocker-chan (inc 
num-remaining)])
  (async/go
(async/ num-remaining 1)
  (do
(swap! blockers update-in [group-key 1] dec)
(recur))
  (do
(swap! blockers dissoc group-key)
(recur)))
nil))

Does anything in here look bad? Is there a way to gracefully handle 
input-chan closing without using loop/recur? I'm using a new channel for 
every new record to block the next record with the same `splitter` return 
value - my first approach used one channel per distinct `splitter` value, 
but I saw some results printed out of order. Does this mean that the order 
of takes from http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Re: Seeking a function to partially parallelize collection processing

2017-06-19 Thread Didier
So, if I understand correctly, you need to have one function sequentially and 
lazily split the stream, then you want each split to be sequentially processed, 
but you'd like different splits to be processed in parallel.

I think for splitting, you could use (reductions), and then you could (pmap) 
over it.

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Re: Seeking a function to partially parallelize collection processing

2017-06-19 Thread Tom Connors
Thanks Jose and Sam for the suggestions. I'm having some trouble figuring 
out the lifecycle for the channels created for each return value from the 
splitter function. I'll post my code once I have something I think works in 
case it's interesting to anyone in the future.

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Re: Seeking a function to partially parallelize collection processing

2017-06-17 Thread Sam Raker
core.async pub/sub ?

On Friday, June 16, 2017 at 10:13:11 AM UTC-4, Tom Connors wrote:
>
> I'm looking for a function that would likely be named something like 
> "sequential-by" or "parallel-per" that takes some data-producing thing like 
> a lazy seq or a core async channel, a function to split records from that 
> input, and a function to handle each item. Each item with an identical 
> return value from the "split" function would be handled sequentially, while 
> the handling of the collection as a whole would be parallel.
>
> If we assume this signature:
> (parallel-per splitter handler inputs)
>
>
> Calling it like this:
> (parallel-per :id
>   (fn [m] (prn (update m :val inc)))
>   [{:id 1 :val 1}, {:id 1 :val 2}, {:id 3 :val 1}])
>
>
> Would result in the first two maps being handled sequentially, while the 
> third map is handled in parallel with the first two. The order of the 
> printed lines would probably be non-deterministic, except {:id 1 :val 2} 
> would be printed before {:id 1 :val 3}.
>
> Note that for my use case I don't care about return values, but if 
> something like this already exists it's plausible that it returns something 
> equivalent to (map handler inputs).
>
> Does anything like this already exist in core or some lib? If not, any 
> recommendations for how to build it?
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Re: Seeking a function to partially parallelize collection processing

2017-06-16 Thread Jose Figueroa Martinez
Hello Tom,

I think you are talking about distribution, not parallelization. As I see 
(sorry for not reading enough previously) you want a way to handle 
different things in a sequential way where each sequence of things (already 
grouped) are handled in a different thread.

You can put the things in a specific channel (*core.async*) depending on 
your *:id* (for example), and you can handle each specific channel on their 
own thread (*future* maybe).

I can be wrong but if I understood well you have a fairly plausible 
solution using *future*s and *core.async*'s channels.

Excuse me for not helping you more. I'm in a work meeting.


Saludos.


El viernes, 16 de junio de 2017, 9:13:11 (UTC-5), Tom Connors escribió:
>
> I'm looking for a function that would likely be named something like 
> "sequential-by" or "parallel-per" that takes some data-producing thing like 
> a lazy seq or a core async channel, a function to split records from that 
> input, and a function to handle each item. Each item with an identical 
> return value from the "split" function would be handled sequentially, while 
> the handling of the collection as a whole would be parallel.
>
> If we assume this signature:
> (parallel-per splitter handler inputs)
>
>
> Calling it like this:
> (parallel-per :id
>   (fn [m] (prn (update m :val inc)))
>   [{:id 1 :val 1}, {:id 1 :val 2}, {:id 3 :val 1}])
>
>
> Would result in the first two maps being handled sequentially, while the 
> third map is handled in parallel with the first two. The order of the 
> printed lines would probably be non-deterministic, except {:id 1 :val 2} 
> would be printed before {:id 1 :val 3}.
>
> Note that for my use case I don't care about return values, but if 
> something like this already exists it's plausible that it returns something 
> equivalent to (map handler inputs).
>
> Does anything like this already exist in core or some lib? If not, any 
> recommendations for how to build it?
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Re: Seeking a function to partially parallelize collection processing

2017-06-16 Thread Tom Connors
Thanks Justin. My mistake. Point 2 stands.

On Friday, June 16, 2017 at 3:58:38 PM UTC-4, Justin Smith wrote:
>
> pmap is rarely actually useful, but point 1 is false, pmap doesn't require 
> that it's input or output fit in memory
>
> On Fri, Jun 16, 2017 at 12:52 PM Tom Connors  > wrote:
>
>> Hello Jose,
>> Thank you for the response, but pmap does not address my use case. It's 
>> insufficient for two reasons: 1) the entire collection must fit in memory. 
>> My use case is handling records from a Kinesis stream. and 2) pmap 
>> parallelizes over the whole collection, whereas I want to parallelize the 
>> collection handling while handling subsets of the data sequentially, as I 
>> discussed in my first post.
>> - Tom
>>
>> On Friday, June 16, 2017 at 10:13:11 AM UTC-4, Tom Connors wrote:
>>>
>>> I'm looking for a function that would likely be named something like 
>>> "sequential-by" or "parallel-per" that takes some data-producing thing like 
>>> a lazy seq or a core async channel, a function to split records from that 
>>> input, and a function to handle each item. Each item with an identical 
>>> return value from the "split" function would be handled sequentially, while 
>>> the handling of the collection as a whole would be parallel.
>>>
>>> If we assume this signature:
>>> (parallel-per splitter handler inputs)
>>>
>>>
>>> Calling it like this:
>>> (parallel-per :id
>>>   (fn [m] (prn (update m :val inc)))
>>>   [{:id 1 :val 1}, {:id 1 :val 2}, {:id 3 :val 1}])
>>>
>>>
>>> Would result in the first two maps being handled sequentially, while the 
>>> third map is handled in parallel with the first two. The order of the 
>>> printed lines would probably be non-deterministic, except {:id 1 :val 2} 
>>> would be printed before {:id 1 :val 3}.
>>>
>>> Note that for my use case I don't care about return values, but if 
>>> something like this already exists it's plausible that it returns something 
>>> equivalent to (map handler inputs).
>>>
>>> Does anything like this already exist in core or some lib? If not, any 
>>> recommendations for how to build it?
>>>
>> -- 
>> You received this message because you are subscribed to the Google
>> Groups "Clojure" group.
>> To post to this group, send email to clo...@googlegroups.com 
>> 
>> Note that posts from new members are moderated - please be patient with 
>> your first post.
>> To unsubscribe from this group, send email to
>> clojure+u...@googlegroups.com 
>> For more options, visit this group at
>> http://groups.google.com/group/clojure?hl=en
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Clojure" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to clojure+u...@googlegroups.com .
>> For more options, visit https://groups.google.com/d/optout.
>>
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Re: Seeking a function to partially parallelize collection processing

2017-06-16 Thread Justin Smith
pmap is rarely actually useful, but point 1 is false, pmap doesn't require
that it's input or output fit in memory

On Fri, Jun 16, 2017 at 12:52 PM Tom Connors  wrote:

> Hello Jose,
> Thank you for the response, but pmap does not address my use case. It's
> insufficient for two reasons: 1) the entire collection must fit in memory.
> My use case is handling records from a Kinesis stream. and 2) pmap
> parallelizes over the whole collection, whereas I want to parallelize the
> collection handling while handling subsets of the data sequentially, as I
> discussed in my first post.
> - Tom
>
> On Friday, June 16, 2017 at 10:13:11 AM UTC-4, Tom Connors wrote:
>>
>> I'm looking for a function that would likely be named something like
>> "sequential-by" or "parallel-per" that takes some data-producing thing like
>> a lazy seq or a core async channel, a function to split records from that
>> input, and a function to handle each item. Each item with an identical
>> return value from the "split" function would be handled sequentially, while
>> the handling of the collection as a whole would be parallel.
>>
>> If we assume this signature:
>> (parallel-per splitter handler inputs)
>>
>>
>> Calling it like this:
>> (parallel-per :id
>>   (fn [m] (prn (update m :val inc)))
>>   [{:id 1 :val 1}, {:id 1 :val 2}, {:id 3 :val 1}])
>>
>>
>> Would result in the first two maps being handled sequentially, while the
>> third map is handled in parallel with the first two. The order of the
>> printed lines would probably be non-deterministic, except {:id 1 :val 2}
>> would be printed before {:id 1 :val 3}.
>>
>> Note that for my use case I don't care about return values, but if
>> something like this already exists it's plausible that it returns something
>> equivalent to (map handler inputs).
>>
>> Does anything like this already exist in core or some lib? If not, any
>> recommendations for how to build it?
>>
> --
> You received this message because you are subscribed to the Google
> Groups "Clojure" group.
> To post to this group, send email to clojure@googlegroups.com
> Note that posts from new members are moderated - please be patient with
> your first post.
> To unsubscribe from this group, send email to
> clojure+unsubscr...@googlegroups.com
> For more options, visit this group at
> http://groups.google.com/group/clojure?hl=en
> ---
> You received this message because you are subscribed to the Google Groups
> "Clojure" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to clojure+unsubscr...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Re: Seeking a function to partially parallelize collection processing

2017-06-16 Thread Tom Connors
Hello Jose,
Thank you for the response, but pmap does not address my use case. It's 
insufficient for two reasons: 1) the entire collection must fit in memory. 
My use case is handling records from a Kinesis stream. and 2) pmap 
parallelizes over the whole collection, whereas I want to parallelize the 
collection handling while handling subsets of the data sequentially, as I 
discussed in my first post.
- Tom

On Friday, June 16, 2017 at 10:13:11 AM UTC-4, Tom Connors wrote:
>
> I'm looking for a function that would likely be named something like 
> "sequential-by" or "parallel-per" that takes some data-producing thing like 
> a lazy seq or a core async channel, a function to split records from that 
> input, and a function to handle each item. Each item with an identical 
> return value from the "split" function would be handled sequentially, while 
> the handling of the collection as a whole would be parallel.
>
> If we assume this signature:
> (parallel-per splitter handler inputs)
>
>
> Calling it like this:
> (parallel-per :id
>   (fn [m] (prn (update m :val inc)))
>   [{:id 1 :val 1}, {:id 1 :val 2}, {:id 3 :val 1}])
>
>
> Would result in the first two maps being handled sequentially, while the 
> third map is handled in parallel with the first two. The order of the 
> printed lines would probably be non-deterministic, except {:id 1 :val 2} 
> would be printed before {:id 1 :val 3}.
>
> Note that for my use case I don't care about return values, but if 
> something like this already exists it's plausible that it returns something 
> equivalent to (map handler inputs).
>
> Does anything like this already exist in core or some lib? If not, any 
> recommendations for how to build it?
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Re: Seeking a function to partially parallelize collection processing

2017-06-16 Thread Jose Figueroa Martinez
Hello,

there are many videos on how parallelize sequential processing on 
ClojureTV, but, the most basic way in Clojure I think is *pmap*

Saludos.


El viernes, 16 de junio de 2017, 9:13:11 (UTC-5), Tom Connors escribió:
>
> I'm looking for a function that would likely be named something like 
> "sequential-by" or "parallel-per" that takes some data-producing thing like 
> a lazy seq or a core async channel, a function to split records from that 
> input, and a function to handle each item. Each item with an identical 
> return value from the "split" function would be handled sequentially, while 
> the handling of the collection as a whole would be parallel.
>
> If we assume this signature:
> (parallel-per splitter handler inputs)
>
>
> Calling it like this:
> (parallel-per :id
>   (fn [m] (prn (update m :val inc)))
>   [{:id 1 :val 1}, {:id 1 :val 2}, {:id 3 :val 1}])
>
>
> Would result in the first two maps being handled sequentially, while the 
> third map is handled in parallel with the first two. The order of the 
> printed lines would probably be non-deterministic, except {:id 1 :val 2} 
> would be printed before {:id 1 :val 3}.
>
> Note that for my use case I don't care about return values, but if 
> something like this already exists it's plausible that it returns something 
> equivalent to (map handler inputs).
>
> Does anything like this already exist in core or some lib? If not, any 
> recommendations for how to build it?
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Seeking a function to partially parallelize collection processing

2017-06-16 Thread Tom Connors
I'm looking for a function that would likely be named something like 
"sequential-by" or "parallel-per" that takes some data-producing thing like 
a lazy seq or a core async channel, a function to split records from that 
input, and a function to handle each item. Each item with an identical 
return value from the "split" function would be handled sequentially, while 
the handling of the collection as a whole would be parallel.

If we assume this signature:
(parallel-per splitter handler inputs)


Calling it like this:
(parallel-per :id
  (fn [m] (prn (update m :val inc)))
  [{:id 1 :val 1}, {:id 1 :val 2}, {:id 3 :val 1}])


Would result in the first two maps being handled sequentially, while the 
third map is handled in parallel with the first two. The order of the 
printed lines would probably be non-deterministic, except {:id 1 :val 2} 
would be printed before {:id 1 :val 3}.

Note that for my use case I don't care about return values, but if 
something like this already exists it's plausible that it returns something 
equivalent to (map handler inputs).

Does anything like this already exist in core or some lib? If not, any 
recommendations for how to build it?

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.