Hi,
I am trying to set up topics to act as "message buffers" to web clients.
I would like the client to be able to re-connect at any time, and have
the latest N messages (or AFAIUI, some number of Megabytes).
So, I think I need;
1. non-persistent topics
2. very long MessageTTL on Namespace used for this
3. RetentionPolicies with -1 time limit and some number of MB
Client in question is GO.
I am reading (see below) the topic, not subscribing.
Question 1;
When I do 'pulsar.EarliestMessageID()', should I be getting the oldest
message in the Retention buffer, or the oldest message not acknowledged?
If the latter, will subsequent messages be read out, or will the same
client be served the same message over and over again?
Question 2;
Will RetentionPolicies ("size"/"time") cause messages to be kept in RAM,
or can I rely on messages being read from disk, even though they are
non-persistent?
TIA
Niclas
-o-o-o- Relevant code -o-o-o-
Creating reader;
func (p *PulsarClient) CreateReader(topic string, earliest bool)
pulsar.Reader {
var start pulsar.MessageID
if earliest {
start = pulsar.EarliestMessageID()
} else {
start = pulsar.LatestMessageID()
}
reader, err := p.client.CreateReader(pulsar.ReaderOptions{
Topic: topic,
StartMessageID: start,
})
if err != nil {
log.DefaultLogger.Error(fmt.Sprintf("Failed to create Pulsar
Reader for: %s", topic), err)
}
return reader
}
Reading messages;
reader :=
h.pulsar.CreateReader(model.NotificationTopics+strconv.FormatInt(orgId,
10), true)
defer reader.Close()
for {
msg, err := reader.Next(ctx)
if msg == nil {
log.DefaultLogger.Info("Grafana sender: DONE")
return ctx.Err()
}
if err != nil {
log.DefaultLogger.Error(fmt.Sprintf("Couldn't get the
message via reader.Next(): %+v", err))
continue
}
log.DefaultLogger.Info("Sending notification to " +
req.PluginContext.User.Login)
err = sender.SendJSON(msg.Payload())
if err != nil {
log.DefaultLogger.Error(fmt.Sprintf("Couldn't send frame:
%v", err))
return err
}
}
}