What a mess up! I already had it changed to "persistent://" and I
misread "non-partitioned" in my server code.
So, my topics ARE persistent already, non-partitioned (to save space?).
The rest of my original post still applies.
On 2023-01-24 09:03, Niclas Hedhman wrote:
Hi,
I am trying to set up topics to act as "message buffers" to web
clients.
I would like the client to be able to re-connect at any time, and have
the latest N messages (or AFAIUI, some number of Megabytes).
So, I think I need;
1. non-persistent topics
2. very long MessageTTL on Namespace used for this
3. RetentionPolicies with -1 time limit and some number of MB
Client in question is GO.
I am reading (see below) the topic, not subscribing.
Question 1;
When I do 'pulsar.EarliestMessageID()', should I be getting the oldest
message in the Retention buffer, or the oldest message not
acknowledged?
If the latter, will subsequent messages be read out, or will the same
client be served the same message over and over again?
Question 2;
Will RetentionPolicies ("size"/"time") cause messages to be kept in
RAM, or can I rely on messages being read from disk, even though they
are non-persistent?
TIA
Niclas
-o-o-o- Relevant code -o-o-o-
Creating reader;
func (p *PulsarClient) CreateReader(topic string, earliest bool)
pulsar.Reader {
var start pulsar.MessageID
if earliest {
start = pulsar.EarliestMessageID()
} else {
start = pulsar.LatestMessageID()
}
reader, err := p.client.CreateReader(pulsar.ReaderOptions{
Topic: topic,
StartMessageID: start,
})
if err != nil {
log.DefaultLogger.Error(fmt.Sprintf("Failed to create Pulsar
Reader for: %s", topic), err)
}
return reader
}
Reading messages;
reader :=
h.pulsar.CreateReader(model.NotificationTopics+strconv.FormatInt(orgId,
10), true)
defer reader.Close()
for {
msg, err := reader.Next(ctx)
if msg == nil {
log.DefaultLogger.Info("Grafana sender: DONE")
return ctx.Err()
}
if err != nil {
log.DefaultLogger.Error(fmt.Sprintf("Couldn't get the
message via reader.Next(): %+v", err))
continue
}
log.DefaultLogger.Info("Sending notification to " +
req.PluginContext.User.Login)
err = sender.SendJSON(msg.Payload())
if err != nil {
log.DefaultLogger.Error(fmt.Sprintf("Couldn't send frame:
%v", err))
return err
}
}
}