Hi Henry, On Thu, Sep 25, 2014 at 8:25 AM, Henrik Johansen < [email protected]> wrote:
> Just read an old blogpost of Nicolas Cellier's today that suggested > something like this, and thought it sounded interesting. > It's use would be for parallell operations that involve IO delays (the > example in the post was MCRepositoryGroup >> #includesVersionNamed:) > An important assumption would be that the blocks are free of side-effects, > so evaluation completion/order is not relied upon for further correct > operation. > > The goal would be to be able to write the method in question like this: > > includesVersionNamed: aString > " check for existing version name in parallel over all > repositories " > > ^self repositories threadedAnySatisfy: [:repository | repository > includesVersionNamed: aString ] > > while preserving the important property of anySatisfy: that is returns as > soon as an element evaluates true. > > Here's a sample implementation: > > Collection >> #threadedAnySatisfy: aBlock > "Evaluate aBlock with the elements of the receiver. > If aBlock returns true for any element return true. > Otherwise return false." > > | threads results processed | > threads := OrderedCollection new: self size. > results := AtomicSharedQueue new. > processed := 0. > self > do: [ :each | > threads > add: > ([ results nextPut: (aBlock value: > each) ] > newProcess > priority: Processor > activeProcess priority - 1) ]. > threads do: #resume. > [ processed = self size ] > whileFalse: [ > results next > ifTrue: [ > threads do: #terminate. > ^ true ]. > processed := processed + 1 ]. > ^ false > > Test: > > [test := #(2000 1000 ). > test threadedAnySatisfy: [:each | (Delay forMilliseconds: each) wait. > each = 1000]] timeToRun 1000 > test anySatisfy: [:each | (Delay forMilliseconds: each) wait. > each = 1000]] timeToRun 3000 > Cool. But please call it parallelAnySatisfy: or anySatisfyInParallel:. Are you sure there are no parallel hazards in the includesVersionNamed code? If one considers referencing AtomicSharedQueue in a Collection method bad > form, one could also use Arrays + explicit Semaphore, along the lines of: > > threadedAnySatisfy: aBlock > "Evaluate aBlock with the elements of the receiver. > If aBlock returns true for any element return true. > Otherwise return false." > > | threads results processed semaphore| > threads := Array new: self size. > results := Array new: self size withAll: false. > processed := 0. > self > inject: 1 into: [:index :each | > threads > at: index put: > ([ results at: index put: (aBlock > value: each) ] > newProcess > priority: Processor > activeProcess priority - 1). > index + 1 ]. > threads do: #resume. > [ processed = self size ] > whileFalse: [ semaphore wait. > (results anySatisfy: #yourself) > ifTrue: [ > threads do: #terminate. > ^ true ]. > processed := processed + 1 ]. > ^ false > > The cost of that is needing an anySatisfy: per signal received. (on an > array of booleans, so the cost isn't large, but still). > IMHO, the better option would be the version using a shared queue, but > putting it in a non-core package, say, Collections - Parallell. > > Anyone else think it'd be a useful addition? > > Cheers, > Henry > -- best, Eliot
