Re: Fiber and Thread Communication

2016-04-09 Thread Ali Çehreli via Digitalmars-d-learn

On 04/09/2016 07:45 AM, Nordlöw wrote:
> On Friday, 8 April 2016 at 10:51:49 UTC, Nordlöw wrote:

> AFAICT, it is not clear what are the limitations of the current
> std.concurrency and, from what. An illustrating example on task-based
> parallellism (such as the ones in jin.go) should partly alleviate this
> problem.

I don't know how much this helps but I was able to write an example that 
seems to work (elsewhere in this thread):


  http://forum.dlang.org/post/nec62k$26v7$1...@digitalmars.com

Ali



Re: Fiber and Thread Communication

2016-04-09 Thread Ali Çehreli via Digitalmars-d-learn

On 04/08/2016 02:42 PM, Dicebot wrote:

>> Thanks Dicebot. I don't think the included
>> std.concurrency.FiberScheduler has support for message passing because
>> FiberScheduler.spawn does not return a Tid. If so, I don't see how
>> it's possible to send messages between fibers.
>>
>> Ali
>
> Looks like a (funny) oversight.

Sorry, I misled you. :)

> Note that you get it for get fiber via
> 
https://github.com/D-Programming-Language/phobos/blob/master/std/concurrency.d#L1337 


> (and FiberScheduler specifically extends Fiber to add ThreadInfo to it)
> but there is no clear way to pass that info to spawn host. I have a
> feeling that if that code is patched to simply provide Tid, message
> passing will just magically work. Needs to be checked though.

It turns out, instead of calling scheduler.spawn() directly, the program 
sets the __gshared 'scheduler' variable first and then calls spawn() as 
usual, which does return that fiber's Tid:


import std.stdio;
import std.concurrency;
import std.range;
import std.algorithm;

struct Done {
}

void workerTask(int id) {
writefln("workerTask %s started", id);

bool done = false;
while (!done) {
receive(
(int message) {
writefln("workerTask %s received %s", id, message);
ownerTid.send(message * id);
},
(Done message) {
writefln("workerTask %s received Done", id);
done = true;
});

// Seems not to be needed:
// scheduler.yield();
}

writefln("workerTask %s exiting", id);
}

void mainTask() {
enum workerCount = 5;
enum loopCount = 3;

writeln("mainTask started");

auto workers = iota(workerCount)
   .map!(id => spawn(, id))
   .array;

foreach (i; 0 .. loopCount) {
foreach (id, worker; workers) {
worker.send(i);
auto response = receiveOnly!int();
assert(response == i * id);
writefln("mainTask received %s", response);
}
}

writeln("mainTask sending Done messages");

foreach (worker; workers) {
worker.send(Done());
}

writeln("mainTask exiting");
}

void main() {
scheduler = new FiberScheduler;
scheduler.start({
mainTask();
});
}

Ali



Re: Fiber and Thread Communication

2016-04-09 Thread Nordlöw via Digitalmars-d-learn

On Friday, 8 April 2016 at 10:51:49 UTC, Nordlöw wrote:

Are there any plans to unite


AFAICT, it is not clear what are the limitations of the current 
std.concurrency and, from what. An illustrating example on 
task-based parallellism (such as the ones in jin.go) should 
partly alleviate this problem. Any ideas on what such an example 
should contain and illustrate. Current limitations and plans on 
fixing should be described aswell.


References:

http://forum.dlang.org/post/mailman.776.1459177268.26339.digitalmar...@puremagic.com
http://code.dlang.org/packages/jin-go
https://github.com/nin-jin/go.d

Further, who's up for the job of add the missing parts in 
std.concurrency using ideas and code from 
https://github.com/nin-jin/go.d ? :)


Re: Fiber and Thread Communication

2016-04-08 Thread Dicebot via Digitalmars-d-learn

On Friday, 8 April 2016 at 20:25:11 UTC, Ali Çehreli wrote:

On 04/08/2016 01:16 PM, Dicebot wrote:

On Friday, 8 April 2016 at 19:46:17 UTC, tcak wrote:

On Friday, 8 April 2016 at 15:33:46 UTC, Dicebot wrote:

On Friday, 8 April 2016 at 14:08:39 UTC, Nordlöw wrote:

So a TId can represent either a thread or a fiber?


AFAIR, yes (I haven't used std.concurrency in a long while, 
telling

all from memory only).


yes what? Thread or Fiber.


Yes both :) Tid represent abstract execution context, with no
implications about underlying executor.


Thanks Dicebot. I don't think the included 
std.concurrency.FiberScheduler has support for message passing 
because FiberScheduler.spawn does not return a Tid. If so, I 
don't see how it's possible to send messages between fibers.


Ali


Looks like a (funny) oversight. Note that you get it for get 
fiber via 
https://github.com/D-Programming-Language/phobos/blob/master/std/concurrency.d#L1337 (and FiberScheduler specifically extends Fiber to add ThreadInfo to it) but there is no clear way to pass that info to spawn host. I have a feeling that if that code is patched to simply provide Tid, message passing will just magically work. Needs to be checked though.


Re: Fiber and Thread Communication

2016-04-08 Thread Ali Çehreli via Digitalmars-d-learn

On 04/08/2016 01:16 PM, Dicebot wrote:

On Friday, 8 April 2016 at 19:46:17 UTC, tcak wrote:

On Friday, 8 April 2016 at 15:33:46 UTC, Dicebot wrote:

On Friday, 8 April 2016 at 14:08:39 UTC, Nordlöw wrote:

So a TId can represent either a thread or a fiber?


AFAIR, yes (I haven't used std.concurrency in a long while, telling
all from memory only).


yes what? Thread or Fiber.


Yes both :) Tid represent abstract execution context, with no
implications about underlying executor.


Thanks Dicebot. I don't think the included 
std.concurrency.FiberScheduler has support for message passing because 
FiberScheduler.spawn does not return a Tid. If so, I don't see how it's 
possible to send messages between fibers.


Ali



Re: Fiber and Thread Communication

2016-04-08 Thread Dicebot via Digitalmars-d-learn

On Friday, 8 April 2016 at 19:46:17 UTC, tcak wrote:

On Friday, 8 April 2016 at 15:33:46 UTC, Dicebot wrote:

On Friday, 8 April 2016 at 14:08:39 UTC, Nordlöw wrote:

So a TId can represent either a thread or a fiber?


AFAIR, yes (I haven't used std.concurrency in a long while, 
telling all from memory only).


yes what? Thread or Fiber.


Yes both :) Tid represent abstract execution context, with no 
implications about underlying executor.


Re: Fiber and Thread Communication

2016-04-08 Thread tcak via Digitalmars-d-learn

On Friday, 8 April 2016 at 15:33:46 UTC, Dicebot wrote:

On Friday, 8 April 2016 at 14:08:39 UTC, Nordlöw wrote:

So a TId can represent either a thread or a fiber?


AFAIR, yes (I haven't used std.concurrency in a long while, 
telling all from memory only).


yes what? Thread or Fiber.

---

Anyway. Since, Fiber is not like a thread, and when a thread 
starts a Fiber, it is like calling a normal function, I guess TId 
represents the thread still.


Re: Fiber and Thread Communication

2016-04-08 Thread Alex Parrill via Digitalmars-d-learn

On Friday, 8 April 2016 at 14:08:39 UTC, Nordlöw wrote:


So a TId can represent either a thread or a fiber?


It represents a "logical thread", which currently consists of 
coroutines or OS threads but could theoretically be extended to, 
say, other processes or even other machines.


Re: Fiber and Thread Communication

2016-04-08 Thread Dicebot via Digitalmars-d-learn

On Friday, 8 April 2016 at 14:08:39 UTC, Nordlöw wrote:

So a TId can represent either a thread or a fiber?


AFAIR, yes (I haven't used std.concurrency in a long while, 
telling all from memory only).


Re: Fiber and Thread Communication

2016-04-08 Thread Nordlöw via Digitalmars-d-learn

On Friday, 8 April 2016 at 13:15:07 UTC, Dicebot wrote:

On Friday, 8 April 2016 at 11:18:11 UTC, Nordlöw wrote:

On Friday, 8 April 2016 at 11:01:21 UTC, Dicebot wrote:
Doesn't std.concurrency support both right now? I remember 
seeing PR that adds message box support to fibers ages ago.


See https://issues.dlang.org/show_bug.cgi?id=12090 and 
https://github.com/D-Programming-Language/phobos/pull/1910 for 
relevant code (as you can see it was merged several releases 
ago)



1. What functions provide message box communication?


The same ones as thread ones. API is completely transparent.


2. But Fibers cannot currently be moved between threads right?


Yes, and this is by design. It harms performance of concurrent 
apps.


So a TId can represent either a thread or a fiber?


Re: Fiber and Thread Communication

2016-04-08 Thread Dicebot via Digitalmars-d-learn

On Friday, 8 April 2016 at 11:18:11 UTC, Nordlöw wrote:

On Friday, 8 April 2016 at 11:01:21 UTC, Dicebot wrote:
Doesn't std.concurrency support both right now? I remember 
seeing PR that adds message box support to fibers ages ago.


See https://issues.dlang.org/show_bug.cgi?id=12090 and 
https://github.com/D-Programming-Language/phobos/pull/1910 for 
relevant code (as you can see it was merged several releases ago)



1. What functions provide message box communication?


The same ones as thread ones. API is completely transparent.


2. But Fibers cannot currently be moved between threads right?


Yes, and this is by design. It harms performance of concurrent 
apps.


Re: Fiber and Thread Communication

2016-04-08 Thread Nordlöw via Digitalmars-d-learn

On Friday, 8 April 2016 at 11:01:21 UTC, Dicebot wrote:
Doesn't std.concurrency support both right now? I remember 
seeing PR that adds message box support to fibers ages ago.


What progress has been since post:

http://forum.dlang.org/post/k4jsef$26h6$1...@digitalmars.com


Re: Fiber and Thread Communication

2016-04-08 Thread Nordlöw via Digitalmars-d-learn

On Friday, 8 April 2016 at 11:01:21 UTC, Dicebot wrote:
Doesn't std.concurrency support both right now? I remember 
seeing PR that adds message box support to fibers ages ago.


1. What functions provide message box communication?

2. But Fibers cannot currently be moved between threads right?


Re: Fiber and Thread Communication

2016-04-08 Thread Dicebot via Digitalmars-d-learn

On Friday, 8 April 2016 at 10:51:49 UTC, Nordlöw wrote:

Are there any plans to unite

fiber-to-fiber communication

with

thread-to-thread communication

in Phobos?

Does vibe.d give any solutions here?


Doesn't std.concurrency support both right now? I remember seeing 
PR that adds message box support to fibers ages ago.


Fiber and Thread Communication

2016-04-08 Thread Nordlöw via Digitalmars-d-learn

Are there any plans to unite

fiber-to-fiber communication

with

thread-to-thread communication

in Phobos?

Does vibe.d give any solutions here?


Re: Thread communication

2015-08-06 Thread Chris via Digitalmars-d-learn
On Thursday, 6 August 2015 at 08:40:58 UTC, Kagamin wrote:
}


AFAIK, boost does it by integrating support for interruption 
into various functions, so IO, waits and locks reply to 
interrupt requests appropriately. You can do something similar.


I understand the philosophy behind D-threads. However, I have a 
situation where waiting for a thread to react to an abort signal 
(if it reacts at all) and finish according to a protocol can 
cause a delay that may not be acceptable to the user or cause 
inconsistencies. Instant abortion works best with data sharing. 
However, then I have the ugly situation where I have to place the 
abort flag at strategical places in several functions/blocks to 
make sure the task will not be pursued, because you never know 
when exactly the new input will arrive. In this way it can be 
intercepted. Unfortunately, this is messy and it is not easy to 
avoid data races.


A possible solution would be to halt all threads except for the 
main thread, spawn a new thread, and end the old thread silently 
behind the scenes. I'm not sure, if this is possible though. I 
also wonder, if it would be possible to use some sort of observer 
that never sleeps.


Re: Thread communication

2015-08-06 Thread Kagamin via Digitalmars-d-learn

On Tuesday, 4 August 2015 at 15:19:51 UTC, Chris wrote:

foreach (ref i; 0..10)
{
  writefln(%d.\tDoing something with input %s, i+1, 
input);

  Thread.sleep(500.msecs);
}


AFAIK, boost does it by integrating support for interruption into 
various functions, so IO, waits and locks reply to interrupt 
requests appropriately. You can do something similar.


Re: Thread communication

2015-08-05 Thread thedeemon via Digitalmars-d-learn

On Tuesday, 4 August 2015 at 15:19:51 UTC, Chris wrote:

I want to stop (and abort) the worker as soon as new input 
arrives. However, while executing the function that contains 
the foreach-loop the worker thread doesn't listen, because it's 
busy, of course.


I think this is a matter of architecture. If you want to use 
message-passing and you want the worker to react quickly to new 
events, this means it needs to check for new messages (via 
receiveTimeout) often enough, there's no way around it.




Re: Thread communication

2015-08-05 Thread Chris via Digitalmars-d-learn

On Tuesday, 4 August 2015 at 18:15:08 UTC, Ali Çehreli wrote:

On 08/04/2015 09:19 AM, Dicebot wrote:

receiveTimeout


I think the problem here is that the worker is busy, not even 
able to call that.


This sounds like sending a signal to the specific thread (with 
pthread_kill()) but I don't know the details of it nor whether 
Phobos supports it.


Ali


The problem is that it works up to a certain extent with 
receiveTimeout. However, if the input arrives in very short 
intervals, all the solutions I've come up with so far (including 
data sharing) fail sooner or later. New threads are spawned 
faster than old ones can be given the abort signal. There are 
ways to wait, till a given thread dies, say with a shared 
variable isAlive `while (isAlive) {}`, but even here I've come 
across problems when the input comes very fast.


I don't know how to solve this problem, because message passing 
follows a linear protocol (as far as I understand it) and shared 
variables give rise to data races. Something like pthread_kill() 
would indeed be useful, to terminate a thread at random. I wonder 
if fibers would be an option.


D-threads seem to be based on the assumption that there is no 
need to abort threads at random, any time. Or am I mistaken?


Re: Thread communication

2015-08-05 Thread Alex Parrill via Digitalmars-d-learn

On Wednesday, 5 August 2015 at 14:31:20 UTC, Marc Schütz wrote:
Maybe we can lift this restriction if we know that the thread's 
main function is pure and takes no references to mutable data, 
because then it can by definition never mess up the program's 
state.


That'd be a pretty useless thread; how would it communicate 
results back to the main thread (or wherever it should go)?




Re: Thread communication

2015-08-05 Thread via Digitalmars-d-learn

On Wednesday, 5 August 2015 at 11:23:28 UTC, Chris wrote:
The problem is that it works up to a certain extent with 
receiveTimeout. However, if the input arrives in very short 
intervals, all the solutions I've come up with so far 
(including data sharing) fail sooner or later. New threads are 
spawned faster than old ones can be given the abort signal. 
There are ways to wait, till a given thread dies, say with a 
shared variable isAlive `while (isAlive) {}`, but even here 
I've come across problems when the input comes very fast.


You could use a thread pool, thereby limiting the number of 
threads that can run at any one time. But I guess you want the 
processing of new data to start as soon as possible, in which 
case that wouldn't help you.




I don't know how to solve this problem, because message passing 
follows a linear protocol (as far as I understand it) and 
shared variables give rise to data races. Something like 
pthread_kill() would indeed be useful, to terminate a thread at 
random. I wonder if fibers would be an option.


D-threads seem to be based on the assumption that there is no 
need to abort threads at random, any time. Or am I mistaken?


It was a conscious decision not to provide a kill method for 
threads, because it is impossible to guarantee that your program 
is still consistent afterwards. Maybe we can lift this 
restriction if we know that the thread's main function is pure 
and takes no references to mutable data, because then it can by 
definition never mess up the program's state. OTOH, the GC might 
be running at the time the thread is killed, which could again 
lead to inconsistencies...


Re: Thread communication

2015-08-05 Thread via Digitalmars-d-learn

On Wednesday, 5 August 2015 at 14:34:42 UTC, Alex Parrill wrote:

On Wednesday, 5 August 2015 at 14:31:20 UTC, Marc Schütz wrote:
Maybe we can lift this restriction if we know that the 
thread's main function is pure and takes no references to 
mutable data, because then it can by definition never mess up 
the program's state.


That'd be a pretty useless thread; how would it communicate 
results back to the main thread (or wherever it should go)?


It could return something. `std.concurrency.Tid` would have to be 
extended with a `join()` method that returns its result. Or we 
could somehow allow sending and receiving data.


Re: Thread communication

2015-08-05 Thread 岩倉 澪

On Wednesday, 5 August 2015 at 14:31:20 UTC, Marc Schütz wrote:
It was a conscious decision not to provide a kill method for 
threads, because it is impossible to guarantee that your 
program is still consistent afterwards.


What about the situation where we want to kill worker threads off 
when closing a program? For example, I have a program with a 
thread that does some heavy computation in the background. When 
the application is closed, I want it to abort that computation, 
however I can't just slap a receiveTimeout in the worker thread 
because it is doing its work in a parallel foreach loop.


Thread communication

2015-08-04 Thread Chris via Digitalmars-d-learn
Is there a good way to stop work-intensive threads via thread 
communication (instead of using a shared variable)? The example 
below is very basic and naive and only meant to exemplify the 
basic problem.


I want to stop (and abort) the worker as soon as new input 
arrives. However, while executing the function that contains the 
foreach-loop the worker thread doesn't listen, because it's busy, 
of course. I've tried a few solutions with send and receive in 
this block, but somehow none of them work perfectly.


//===
import std.stdio : readln, writefln, writeln;
import std.string : strip;
import std.concurrency;
import core.thread;

Tid thread1;

struct Exit {}

void main()
{
  string input;
  bool exists;
  while ((input = readln.strip) != null)
  {
if (exists)
{
  thread1.send(Exit());
}
thread1 = spawn(worker);
exists = true;
thread1.send(input.idup);
  }
}

void worker()
{
  bool run = true;
  while (run)
  {
receive(
  (string input)
  {
foreach (ref i; 0..10)
{
  writefln(%d.\tDoing something with input %s, i+1, 
input);

  Thread.sleep(500.msecs);
}
run = false;
  },
  (Exit exit)
  {
run = false;
  }
);
  }
  writeln(End of thread worker);
}
//===


Re: Thread communication

2015-08-04 Thread Dicebot via Digitalmars-d-learn

receiveTimeout


Re: Thread communication

2015-08-04 Thread Ali Çehreli via Digitalmars-d-learn

On 08/04/2015 09:19 AM, Dicebot wrote:

receiveTimeout


I think the problem here is that the worker is busy, not even able to 
call that.


This sounds like sending a signal to the specific thread (with 
pthread_kill()) but I don't know the details of it nor whether Phobos 
supports it.


Ali



std.concurrency thread communication problem

2014-05-17 Thread Charles Hixson via Digitalmars-d-learn
I'm building a program which I intend to have many threads that can each send 
messages to (and receive messages from) each other.  The obvious way to do 
this would be to have a shared array of Tids, but this seems to not work.  I'm 
continually fighting the system to get it to compile, and this makes me think 
it should probably be done some other way...but what?

One possibility is to have each thread maintain a separate array that contains 
all the threads, which would mean that they would need to be initialized after 
they were created.  This would avoid the problems of shared Tids, but each Tid 
contains a private mailbox, so this would be being duplicated, and that 
bothers me...it seems like a poor idea.  (Maybe I'm wrong about that...but I 
don't know.)

I do know that I want a n by n communication matrix (leaving out the main 
thread), with each thread sending messages to all to others.  (Well, except 
for a few that I haven't really defined yet, but which handle separated 
functions.)  My plan was to have each thread run an execution loop which 
frequently checked for messages received in between performing its own 
functions.  They are not intended to synchronize with each other.  They are 
not intended to be temporary, i.e., each of these threads would be started 
shortly after program initialization, and continue running until program 
termination.  But how should I get them to know each other's address?

I don't want the main thread to need to act as a switchboard between all the 
others, though I guess that would sort of work.  (Actually, if I need to do 
that, that job would be pulled off into yet another thread...and I end up with 
more threads than processors.  Still, that's a design that is possible, IIUC.)

Any comments or suggestions?


Re: std.concurrency thread communication problem

2014-05-17 Thread John Colvin via Digitalmars-d-learn
On Saturday, 17 May 2014 at 18:43:25 UTC, Charles Hixson via 
Digitalmars-d-learn wrote:
I'm building a program which I intend to have many threads that 
can each send
messages to (and receive messages from) each other.  The 
obvious way to do
this would be to have a shared array of Tids, but this seems to 
not work.  I'm
continually fighting the system to get it to compile, and this 
makes me think

it should probably be done some other way...but what?

One possibility is to have each thread maintain a separate 
array that contains
all the threads, which would mean that they would need to be 
initialized after
they were created.  This would avoid the problems of shared 
Tids, but each Tid
contains a private mailbox, so this would be being duplicated, 
and that
bothers me...it seems like a poor idea.  (Maybe I'm wrong about 
that...but I

don't know.)


If my understanding is correct, each Tid contains a reference to 
the corresponding thread's MessageBox (implemented by way of 
MessageBox being a class), not an independent instance. You 
should be fine to just have an array of the relevant Tids in each 
thread.


Alternatively, a single __gshared array of threads should work, 
given you are sufficiently careful with it. Remember, if no-one 
is doing any writing then you don't need to do any 
synchronisation of reads.


Re: std.concurrency thread communication problem

2014-05-17 Thread Ali Çehreli via Digitalmars-d-learn

On 05/17/2014 12:33 PM, John Colvin wrote:

On Saturday, 17 May 2014 at 18:43:25 UTC, Charles Hixson via
Digitalmars-d-learn wrote:

I'm building a program which I intend to have many threads that can
each send
messages to (and receive messages from) each other.  The obvious way
to do
this would be to have a shared array of Tids, but this seems to not
work.  I'm
continually fighting the system to get it to compile, and this makes
me think
it should probably be done some other way...but what?

One possibility is to have each thread maintain a separate array that
contains
all the threads, which would mean that they would need to be
initialized after
they were created.  This would avoid the problems of shared Tids, but
each Tid
contains a private mailbox, so this would be being duplicated, and that
bothers me...it seems like a poor idea.  (Maybe I'm wrong about
that...but I
don't know.)


If my understanding is correct, each Tid contains a reference to the
corresponding thread's MessageBox (implemented by way of MessageBox
being a class), not an independent instance. You should be fine to just
have an array of the relevant Tids in each thread.

Alternatively, a single __gshared array of threads should work, given
you are sufficiently careful with it. Remember, if no-one is doing any
writing then you don't need to do any synchronisation of reads.


The following is what I've come up with. I had to use a number of 
shared-related casts.


import std.stdio;
import std.concurrency;
import std.datetime;
import std.random;
import core.thread;

enum threadCount = 5;
enum messagePerThread = 3;

// Represents messages sent to threads to start their tasks
struct Start
{}

// Receives the number (id) of this thread and the workers to send 
messages to

void workerFunc(size_t id, shared(Tid)[] workers)
{
receiveOnly!Start();

// A local function to reduce code duplication
bool checkMessageForMe(Duration timeout)
{
return receiveTimeout(
timeout,
(size_t from) {
writefln(%s received from %s, id, from);
});
}

// My main task is to send messages to others:
size_t totalSent = 0;
while (totalSent  messagePerThread) {
auto to = uniform(0, workers.length);

// Only send to others; not to self
if (to != id) {
auto chosen = cast(Tid)workers[to];
writefln(%s sending to %s, id, to);
chosen.send(id);
++totalSent;
}

checkMessageForMe(0.seconds);
}

// Process trailing messages sent to me
bool received = false;
do {
received = checkMessageForMe(10.msecs);
} while (received);
}

void main()
{
auto workers = new shared(Tid)[threadCount];

foreach (id; 0 .. threadCount) {
auto worker = spawn(workerFunc, id, workers);
workers[id] = cast(shared(Tid))worker;
}

foreach (sharedWorker; workers) {
auto worker = cast(Tid)sharedWorker;
worker.send(Start());
}

thread_joinAll();
}

Sample output:

0 sending to 2
4 sending to 3
4 sending to 2
1 sending to 4
3 received from 4
3 sending to 2
0 sending to 1
4 received from 1
1 received from 0
1 sending to 0
0 received from 1
0 sending to 1
1 received from 0
1 sending to 0
0 received from 1
3 sending to 2
4 sending to 2
2 sending to 0
2 received from 0
2 received from 4
3 sending to 1
2 sending to 3
0 received from 2
1 received from 3
2 received from 3
2 sending to 0
3 received from 2
0 received from 2
2 received from 3
2 received from 4

Ali



Re: std.concurrency thread communication problem

2014-05-17 Thread Charles Hixson via Digitalmars-d-learn
On Saturday, May 17, 2014 12:59:22 PM Ali Çehreli via Digitalmars-d-learn 
wrote:
 On 05/17/2014 12:33 PM, John Colvin wrote:
  On Saturday, 17 May 2014 at 18:43:25 UTC, Charles Hixson via
  
  Digitalmars-d-learn wrote:
  I'm building a program which I intend to have many threads that can
  each send
  messages to (and receive messages from) each other.  The obvious way
  to do
  this would be to have a shared array of Tids, but this seems to not
  work.  I'm
  continually fighting the system to get it to compile, and this makes
  me think
  it should probably be done some other way...but what?
  
  One possibility is to have each thread maintain a separate array that
  contains
  all the threads, which would mean that they would need to be
  initialized after
  they were created.  This would avoid the problems of shared Tids, but
  each Tid
  contains a private mailbox, so this would be being duplicated, and that
  bothers me...it seems like a poor idea.  (Maybe I'm wrong about
  that...but I
  don't know.)
  
  If my understanding is correct, each Tid contains a reference to the
  corresponding thread's MessageBox (implemented by way of MessageBox
  being a class), not an independent instance. You should be fine to just
  have an array of the relevant Tids in each thread.
  
  Alternatively, a single __gshared array of threads should work, given
  you are sufficiently careful with it. Remember, if no-one is doing any
  writing then you don't need to do any synchronisation of reads.
 
 The following is what I've come up with. I had to use a number of
 shared-related casts.
 
 import std.stdio;
 import std.concurrency;
 import std.datetime;
 import std.random;
 import core.thread;
 
 enum threadCount = 5;
 enum messagePerThread = 3;
 
 // Represents messages sent to threads to start their tasks
 struct Start
 {}
 
 // Receives the number (id) of this thread and the workers to send
 messages to
 void workerFunc(size_t id, shared(Tid)[] workers)
 {
  receiveOnly!Start();
 
  // A local function to reduce code duplication
  bool checkMessageForMe(Duration timeout)
  {
  return receiveTimeout(
  timeout,
  (size_t from) {
  writefln(%s received from %s, id, from);
  });
  }
 
  // My main task is to send messages to others:
  size_t totalSent = 0;
  while (totalSent  messagePerThread) {
  auto to = uniform(0, workers.length);
 
  // Only send to others; not to self
  if (to != id) {
  auto chosen = cast(Tid)workers[to];
  writefln(%s sending to %s, id, to);
  chosen.send(id);
  ++totalSent;
  }
 
  checkMessageForMe(0.seconds);
  }
 
  // Process trailing messages sent to me
  bool received = false;
  do {
  received = checkMessageForMe(10.msecs);
  } while (received);
 }
 
 void main()
 {
  auto workers = new shared(Tid)[threadCount];
 
  foreach (id; 0 .. threadCount) {
  auto worker = spawn(workerFunc, id, workers);
  workers[id] = cast(shared(Tid))worker;
  }
 
  foreach (sharedWorker; workers) {
  auto worker = cast(Tid)sharedWorker;
  worker.send(Start());
  }
 
  thread_joinAll();
 }
 
 Sample output:
 
 0 sending to 2
 4 sending to 3
 4 sending to 2
 1 sending to 4
 3 received from 4
 3 sending to 2
 0 sending to 1
 4 received from 1
 1 received from 0
 1 sending to 0
 0 received from 1
 0 sending to 1
 1 received from 0
 1 sending to 0
 0 received from 1
 3 sending to 2
 4 sending to 2
 2 sending to 0
 2 received from 0
 2 received from 4
 3 sending to 1
 2 sending to 3
 0 received from 2
 1 received from 3
 2 received from 3
 2 sending to 0
 3 received from 2
 0 received from 2
 2 received from 3
 2 received from 4
 
 Ali
Thank you immensely.  That is precisely the kind of information I was hoping 
for.