Hi,
I am trying to set up topics to act as "message buffers" to web clients.

I would like the client to be able to re-connect at any time, and have the latest N messages (or AFAIUI, some number of Megabytes).

So, I think I need;
1. non-persistent topics
2. very long MessageTTL on Namespace used for this
3. RetentionPolicies with -1 time limit and some number of MB

Client in question is GO.

I am reading (see below) the topic, not subscribing.

Question 1;
When I do 'pulsar.EarliestMessageID()', should I be getting the oldest message in the Retention buffer, or the oldest message not acknowledged?

If the latter, will subsequent messages be read out, or will the same client be served the same message over and over again?

Question 2;
Will RetentionPolicies ("size"/"time") cause messages to be kept in RAM, or can I rely on messages being read from disk, even though they are non-persistent?


TIA
Niclas


-o-o-o- Relevant code -o-o-o-
Creating reader;

func (p *PulsarClient) CreateReader(topic string, earliest bool) pulsar.Reader {
    var start pulsar.MessageID
    if earliest {
        start = pulsar.EarliestMessageID()
    } else {
        start = pulsar.LatestMessageID()
    }
    reader, err := p.client.CreateReader(pulsar.ReaderOptions{
        Topic:          topic,
        StartMessageID: start,
    })
    if err != nil {
log.DefaultLogger.Error(fmt.Sprintf("Failed to create Pulsar Reader for: %s", topic), err)
    }
    return reader
}


Reading messages;
reader := h.pulsar.CreateReader(model.NotificationTopics+strconv.FormatInt(orgId, 10), true)
    defer reader.Close()
    for {
        msg, err := reader.Next(ctx)
        if msg == nil {
            log.DefaultLogger.Info("Grafana sender: DONE")
            return ctx.Err()
        }
        if err != nil {
log.DefaultLogger.Error(fmt.Sprintf("Couldn't get the message via reader.Next(): %+v", err))
            continue
        }
log.DefaultLogger.Info("Sending notification to " + req.PluginContext.User.Login)
        err = sender.SendJSON(msg.Payload())
        if err != nil {
log.DefaultLogger.Error(fmt.Sprintf("Couldn't send frame: %v", err))
            return err
        }
    }
}

Reply via email to