On Tue 2019-07-09 12:21:01, John Ogness wrote:
> On 2019-07-09, Petr Mladek <[email protected]> wrote:
> >>>> 1. The code claims that the cmpxchg(seq_newest) in
> >>>> prb_reserve_desc() guarantees that "The descriptor is ours until
> >>>> the COMMITTED bit is set."  This is not true if in that wind
> >>>> seq_newest wraps, allowing another writer to gain ownership of the
> >>>> same descriptor. For small descriptor arrays (such as in my test
> >>>> module), this situation is quite easy to reproduce.
> >>>
> >> Let me inline the function are talking about and add commentary to
> >> illustrate what I am saying:
> >> 
> >> static int prb_reserve_desc(struct prb_reserved_entry *entry)
> >> {
> >>    unsigned long seq, seq_newest, seq_prev_wrap;
> >>    struct printk_ringbuffer *rb = entry->rb;
> >>    struct prb_desc *desc;
> >>    int err;
> >> 
> >>    /* Get descriptor for the next sequence number. */
> >>    do {
> >>            seq_newest = READ_ONCE(rb->seq_newest);
> >>            seq = (seq_newest + 1) & PRB_SEQ_MASK;
> >>            seq_prev_wrap = (seq - PRB_DESC_SIZE(rb)) & PRB_SEQ_MASK;
> >> 
> >>            /*
> >>             * Remove conflicting descriptor from the previous wrap
> >>             * if ever used. It might fail when the related data
> >>             * have not been committed yet.
> >>             */
> >>            if (seq_prev_wrap == READ_ONCE(rb->seq_oldest)) {
> >>                    err = prb_remove_desc_oldest(rb, seq_prev_wrap);
> >>                    if (err)
> >>                            return err;
> >>            }
> >>    } while (cmpxchg(&rb->seq_newest, seq_newest, seq) != seq_newest);
> >> 
> >> I am referring to this point in the code, after the
> >> cmpxchg(). seq_newest has been incremented but the descriptor is
> >> still in the unused state and seq is still 1 wrap behind. If an NMI
> >> occurs here and the NMI (or some other CPU) inserts enough entries to
> >> wrap the descriptor array, this descriptor will be reserved again,
> >> even though it has already been reserved.
> >
> > Not really, the NMI will not reach the cmpxchg() in this case.
> > prb_remove_desc_oldest() will return error.
> 
> Why will prb_remove_desc_oldest() fail? IIUC, it will return success
> because the descriptor is in the desc_miss state.
> 
> > It will not be able to remove the conflicting descriptor because
> > it will still be occupied by a two-wraps-old descriptor.

Ah, I see that this situation was not handled correctly.
But it can get fixed pretty easily, see an updated
prb_remove_desc_oldest() at the end of the mail.


> Please explain why with more details. Perhaps providing a function call
> chain?  Sorry if I'm missing the obvious here.

To be on the safe side, let's try it with real numbers.

Let's have array with 8 descriptors filled with the following
sequence numbers pointing to commited messages:

        desc[10]:  16 17 18 19 20 21 22 23
        rb->seq_oldest = 16;
        rb->seq_newest = 23;

then prb_reserve_desc() would do:

        seq_newest = 23;
        seq = 24;
        seq_prew_wrap = 16;

                 prb_remove_desc_oldest(rb, 16);

                 // let's say that it succeeds and
                 // rb->seq_oldest == 17;

        cmpxchg(&rb->seq_newest, 23, 24) == 23)

        // let's say that it succeds and it is immediately
        // interrupted by NMI before desc[0]->dst is set to 24.
        // So, we still have:

        desc[10]: 16 17 18 19 20 21 22 23
        rb->seq_oldest = 17;
        rb->seq_newest = 24;

Let's say that NMI tries to print 8 messages.
After 7 successfully reserved and commited messages
we could have:

        desc[10]: 16 25 26 27 28 29 30 31
        rb->seq_oldest = 24;
        rb->seq_newest = 31;

desc[0] still has the outdated information. Now, we try to reserve
the 8th message, then prb_reserve_desc() would do:

        seq_newest = 31;
        seq = 32;
        seq_prew_wrap = 24;

                 prb_remove_desc_oldest(rb, 24);

                        desc_state = prb_read_desc(rb, 24, &desc);

                        // desc_state == desc_miss because the
                        // descriptor still points to the outdated
                        // seq = 16.

                // prb_remove_desc_oldest(rb, 24) would continue with:
                switch (desc_state) {
                /*
                 * Another seq means that the oldest desciptor has already been
                 * removed and reused. Return success in this case.
                 */
                case desc_miss:
                        return 0;

                BUG: This is obviously wrong!

But this is a special case that can get detected. desc->seq is exactly
1 wrap back than requested. The proper code would be:

static int prb_remove_desc_oldest(struct printk_ringbuffer *rb,
                                  unsigned long seq_oldest)
{
        struct prb_desc desc;
        enum prb_desc_state desc_state;
        int err;

        desc_state = prb_read_desc(rb, seq_oldest, &desc);
        switch (desc_state) {
        case desc_miss:
                unsigned long seq_prev_wrap =
                        (seq_oldest - PRB_DESC_SIZE(rb)) &
                        PRB_SEQ_MASK;

                if (desc->dst ==
                        (seq_prev_wrap |
                         PRB_COMMITED_MASK |
                         PRB_REUSABLE_MASK)) {
                         /*
                          * Special case: Reusable descriptor from the
                          * previous wrap means that the current
                          * oldest descriptor is reserved but the dst
                          * has not been updated yet.
                          */
                          return -ENOMEM;

                /*
                 * Any other desc_misc means that the oldest
                 * has been already removed and reused by a newer
                 * sequence number. Return success in this case.
                 * The attempt to update rb->seq_oldest will fail.
                 */
                return 0;

At this point, any prb_reserve() would fail exactly this way
until NMI returns and the message with seq == 24 gets commited.

Best Regards,
Petr

Reply via email to