I would add that it's more volatile that you think:

There is *no buffer* in memory as you would imagine.

I quote from documentation here
<https://pulsar.apache.org/docs/2.11.x/concepts-messaging/#non-persistent-topics>
:

With non-persistent topics, message data lives only in memory, without a
specific buffer - which means data *is not* buffered in memory. The
received messages are immediately transmitted to all *connected consumers*.
If a message broker fails or message data can otherwise not be retrieved
from memory, your message data may be lost. Use non-persistent topics only
if you're *certain* that your use case requires it and can sustain it.

Meaning, as soon as the message is received, it is immediately pushed to
the socket of each connected consumer.
If your client is a consumer and it is disconnected, once reconnected it
will start receiving messages from this point on.

Honestly I find this non persistent topic quite confusing.


On Tue, Jan 24, 2023 at 10:07 AM Enrico Olivelli <eolive...@gmail.com>
wrote:

> Niclas,
>
>
> Il giorno mar 24 gen 2023 alle ore 09:04 Niclas Hedhman
> <nic...@hedhman.org> ha scritto:
> >
> > Hi,
> > I am trying to set up topics to act as "message buffers" to web clients.
> >
> > I would like the client to be able to re-connect at any time, and have
> > the latest N messages (or AFAIUI, some number of Megabytes).
> >
> > So, I think I need;
> > 1. non-persistent topics
> > 2. very long MessageTTL on Namespace used for this
> > 3. RetentionPolicies with -1 time limit and some number of MB
> >
> > Client in question is GO.
> >
> > I am reading (see below) the topic, not subscribing.
> >
> > Question 1;
> > When I do 'pulsar.EarliestMessageID()', should I be getting the oldest
> > message in the Retention buffer, or the oldest message not acknowledged?
> >
> > If the latter, will subsequent messages be read out, or will the same
> > client be served the same message over and over again?
> >
> > Question 2;
> > Will RetentionPolicies ("size"/"time") cause messages to be kept in RAM,
> > or can I rely on messages being read from disk, even though they are
> > non-persistent?
>
>
> A non-persistent topic is volatile: if you restart the broker you lose the
> data
>
>
> Enrico
>
>
> >
> >
> > TIA
> > Niclas
> >
> >
> > -o-o-o- Relevant code -o-o-o-
> > Creating reader;
> >
> > func (p *PulsarClient) CreateReader(topic string, earliest bool)
> > pulsar.Reader {
> >      var start pulsar.MessageID
> >      if earliest {
> >          start = pulsar.EarliestMessageID()
> >      } else {
> >          start = pulsar.LatestMessageID()
> >      }
> >      reader, err := p.client.CreateReader(pulsar.ReaderOptions{
> >          Topic:          topic,
> >          StartMessageID: start,
> >      })
> >      if err != nil {
> >          log.DefaultLogger.Error(fmt.Sprintf("Failed to create Pulsar
> > Reader for: %s", topic), err)
> >      }
> >      return reader
> > }
> >
> >
> > Reading messages;
> >      reader :=
> > h.pulsar.CreateReader(model.NotificationTopics+strconv.FormatInt(orgId,
> > 10), true)
> >      defer reader.Close()
> >      for {
> >          msg, err := reader.Next(ctx)
> >          if msg == nil {
> >              log.DefaultLogger.Info("Grafana sender: DONE")
> >              return ctx.Err()
> >          }
> >          if err != nil {
> >              log.DefaultLogger.Error(fmt.Sprintf("Couldn't get the
> > message via reader.Next(): %+v", err))
> >              continue
> >          }
> >          log.DefaultLogger.Info("Sending notification to " +
> > req.PluginContext.User.Login)
> >          err = sender.SendJSON(msg.Payload())
> >          if err != nil {
> >              log.DefaultLogger.Error(fmt.Sprintf("Couldn't send frame:
> > %v", err))
> >              return err
> >          }
> >      }
> > }
>

Reply via email to