Hi Henry,

On Thu, Sep 25, 2014 at 8:25 AM, Henrik Johansen <
[email protected]> wrote:

> Just read an old blogpost of Nicolas Cellier's today that suggested
> something like this, and thought it sounded interesting.
> It's use would be for parallell operations that involve IO delays (the
> example in the post was MCRepositoryGroup >> #includesVersionNamed:)
> An important assumption would be that the blocks are free of side-effects,
> so evaluation completion/order is not relied upon for further correct
> operation.
>
> The goal would be to be able to write the method in question like this:
>
> includesVersionNamed: aString
>         " check for existing version name in parallel over all
> repositories "
>
>         ^self repositories threadedAnySatisfy: [:repository | repository
> includesVersionNamed: aString ]
>
> while preserving the important property of anySatisfy: that is returns as
> soon as an element evaluates true.
>
> Here's a sample implementation:
>
> Collection >> #threadedAnySatisfy: aBlock
>         "Evaluate aBlock with the elements of the receiver.
>         If aBlock returns true for any element return true.
>         Otherwise return false."
>
>         | threads results processed |
>         threads := OrderedCollection new: self size.
>         results := AtomicSharedQueue new.
>         processed := 0.
>         self
>                 do: [ :each |
>                         threads
>                                 add:
>                                         ([ results nextPut: (aBlock value:
> each) ]
>                                                 newProcess
>                                                 priority: Processor
> activeProcess priority - 1) ].
>         threads do: #resume.
>         [ processed = self size ]
>                 whileFalse: [
>                         results next
>                                 ifTrue: [
>                                         threads do: #terminate.
>                                         ^ true ].
>                         processed := processed + 1 ].
>         ^ false
>
> Test:
>
> [test := #(2000 1000 ).
> test threadedAnySatisfy: [:each | (Delay forMilliseconds: each) wait.
>                                         each = 1000]] timeToRun 1000
> test anySatisfy: [:each | (Delay forMilliseconds: each) wait.
>                                         each = 1000]] timeToRun  3000
>

Cool.  But please call it parallelAnySatisfy: or anySatisfyInParallel:.
Are you sure there are no parallel hazards in the includesVersionNamed code?

If one considers referencing AtomicSharedQueue in a Collection method bad
> form, one could also use Arrays + explicit Semaphore, along the lines of:
>
> threadedAnySatisfy: aBlock
>         "Evaluate aBlock with the elements of the receiver.
>         If aBlock returns true for any element return true.
>         Otherwise return false."
>
>         | threads results processed semaphore|
>         threads := Array new: self size.
>         results := Array new: self size withAll: false.
>         processed := 0.
>         self
>                 inject: 1 into: [:index :each |
>                         threads
>                                 at: index put:
>                                         ([ results at: index put: (aBlock
> value: each) ]
>                                                 newProcess
>                                                 priority: Processor
> activeProcess priority - 1).
>                                         index + 1 ].
>         threads do: #resume.
>         [ processed = self size ]
>                 whileFalse: [ semaphore wait.
>                         (results anySatisfy: #yourself)
>                                 ifTrue: [
>                                         threads do: #terminate.
>                                         ^ true ].
>                         processed := processed + 1 ].
>         ^ false
>
> The cost of that is needing an anySatisfy: per signal received. (on an
> array of booleans, so the cost isn't large, but still).
> IMHO, the better option would be the version using a shared queue, but
> putting it in a non-core package, say, Collections - Parallell.
>
> Anyone else think it'd be a useful addition?
>
> Cheers,
> Henry
>



-- 
best,
Eliot

Reply via email to