Just read an old blogpost of Nicolas Cellier's today that suggested something 
like this, and thought it sounded interesting.
It's use would be for parallell operations that involve IO delays (the example 
in the post was MCRepositoryGroup >> #includesVersionNamed:)
An important assumption would be that the blocks are free of side-effects, so 
evaluation completion/order is not relied upon for further correct operation.

The goal would be to be able to write the method in question like this:

includesVersionNamed: aString
        " check for existing version name in parallel over all repositories "

        ^self repositories threadedAnySatisfy: [:repository | repository 
includesVersionNamed: aString ]

while preserving the important property of anySatisfy: that is returns as soon 
as an element evaluates true.

Here's a sample implementation:

Collection >> #threadedAnySatisfy: aBlock
        "Evaluate aBlock with the elements of the receiver.
        If aBlock returns true for any element return true.
        Otherwise return false."

        | threads results processed |
        threads := OrderedCollection new: self size.
        results := AtomicSharedQueue new.
        processed := 0.
        self
                do: [ :each | 
                        threads
                                add:
                                        ([ results nextPut: (aBlock value: 
each) ]
                                                newProcess
                                                priority: Processor 
activeProcess priority - 1) ].
        threads do: #resume.
        [ processed = self size ]
                whileFalse: [ 
                        results next
                                ifTrue: [ 
                                        threads do: #terminate.
                                        ^ true ].
                        processed := processed + 1 ].
        ^ false

Test:

[test := #(2000 1000 ).
test threadedAnySatisfy: [:each | (Delay forMilliseconds: each) wait.
                                        each = 1000]] timeToRun 1000 
test anySatisfy: [:each | (Delay forMilliseconds: each) wait.
                                        each = 1000]] timeToRun  3000



If one considers referencing AtomicSharedQueue in a Collection method bad form, 
one could also use Arrays + explicit Semaphore, along the lines of:

threadedAnySatisfy: aBlock
        "Evaluate aBlock with the elements of the receiver.
        If aBlock returns true for any element return true.
        Otherwise return false."

        | threads results processed semaphore|
        threads := Array new: self size.
        results := Array new: self size withAll: false.
        processed := 0.
        self
                inject: 1 into: [:index :each | 
                        threads
                                at: index put:
                                        ([ results at: index put: (aBlock 
value: each) ]
                                                newProcess
                                                priority: Processor 
activeProcess priority - 1).
                                        index + 1 ].
        threads do: #resume.
        [ processed = self size ]
                whileFalse: [ semaphore wait.
                        (results anySatisfy: #yourself)
                                ifTrue: [ 
                                        threads do: #terminate.
                                        ^ true ].
                        processed := processed + 1 ].
        ^ false

The cost of that is needing an anySatisfy: per signal received. (on an array of 
booleans, so the cost isn't large, but still).
IMHO, the better option would be the version using a shared queue, but putting 
it in a non-core package, say, Collections - Parallell.

Anyone else think it'd be a useful addition?

Cheers,
Henry

Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to