It is kind of hard to know what is failing without a complete example. I pieced
together this from the various snippets above and it works fine:
import times, asyncstreams, asyncdispatch
type
MsgKind = enum
mkString,
mkInt,
mkFloat
Message* = object
topic*: string
id*: string
msgtype*: string #"p/s", "req/rep", "reply". req/rep requires id
case msg*: MsgKind
of mkString: strVal: string
of mkInt: intVal: int
of mkFloat: floatVal: float
Subscriber* = object
fs*: FutureStream[Message]
subscriptions*: seq[string] #topic strings
topics*: seq[string] #topic strings
source*: string #name of proc
id*: string
Bus* = object
fsBus*: FutureStream[Message]
var bus = Bus(fsBus: newFutureStream[Message](fromProc = "main"))
proc post*(topic:string, msg:string|int|float, id:string = "",
msgtype:string = "p/s") {.async.} =
var message =
when msg is string:
Message(
topic: topic,
msg: mkString,
strVal: msg,
id: id,
msgType: msgType
)
elif msg is int:
Message(
topic: topic,
msg: mkInt,
intVal: msg,
id: id,
msgType: msgType
)
elif msg is float:
Message(
topic: topic,
msg: mkString,
floatVal: msg,
id: id,
msgType: msgType
)
await bus.fsBus.write(message)
proc tenSeconds(){.async.} =
## posts time every ten seconds
while true:
await post("time", now().utc.format("ddd, d MMM yyyy HH:mm:ss"))
await sleepAsync(1000*10)
proc echoTime() {.async.} =
while true:
let (hasContent, msg) = await bus.fsBus.read
if hasContent:
echo msg
echo msg.strVal
else:
await sleepAsync(0)
asyncCheck tenSeconds()
waitFor echoTime()
Run
You appear to have built some kind of subscriber system though, but without any
details as to how it works it's kind of hard to copy it.