Apologies to those who don't like MIME, but for the sake of easily
highlighting in this long append I chose to use fonts & colors.
I've been spending a lot time trying to track a problem related to doing ipc
things in a multi-cpu configuration. I've been using Linux/390 as the debug
vehicle (what would I do without #CP CPU ALL TR STOR INTO ??) but the
problem also exhibits itself on Intel. I've got so far as to see that a
message element (specifically msg_receiver) is being overwritten while it's
waiting in queue by another part of the kernel (in my case restore_sigregs
but that's just co-incidental not salient). The timing window for this is
very small such that some traps take > 10 hours to spring but sometimes its
20 minutes or so (more often its about 90-120 mins - what a fun day).
Looking at the relevant routine I note that the queue element is declared in
the routine's stack area such that when it exits this area of storage should
be up for grabs. The routine that eventually processes this is in
pipelined_send.
My question to the list is: Is my understanding correct? There is an error
that leads to the system Oopsing due to getting a NULL address from the
contents of this queue element but is the reason I've come up with the right
one.
asmlinkage long sys_msgrcv (int msqid, struct msgbuf *msgp, size_t msgsz,
long msgtyp, int msgflg)
{
struct msg_queue *msq;
struct msg_receiver msr_d; <- Element that will be placed on the
queue
struct list_head* tmp;
struct msg_msg* msg, *found_msg;
int err;
int mode;
if (msqid < 0 || (long) msgsz < 0)
return -EINVAL;
mode = convert_mode(&msgtyp,msgflg);
msq = msg_lock(msqid);
if(msq==NULL)
return -EINVAL;
retry:
err = -EIDRM;
if (msg_checkid(msq,msqid))
goto out_unlock;
err=-EACCES;
if (ipcperms (&msq->q_perm, S_IRUGO))
goto out_unlock;
tmp = msq->q_messages.next;
found_msg=NULL;
while (tmp != &msq->q_messages) {
msg = list_entry(tmp,struct msg_msg,m_list);
if(testmsg(msg,msgtyp,mode)) {
found_msg = msg;
if(mode == SEARCH_LESSEQUAL && msg->m_type != 1) {
found_msg=msg;
msgtyp=msg->m_type-1;
} else {
found_msg=msg;
break;
}
}
tmp = tmp->next;
}
if(found_msg) {
msg=found_msg;
if ((msgsz < msg->m_ts) && !(msgflg & MSG_NOERROR)) {
err=-E2BIG;
goto out_unlock;
}
list_del(&msg->m_list);
msq->q_qnum--;
msq->q_rtime = CURRENT_TIME;
msq->q_lrpid = current->pid;
msq->q_cbytes -= msg->m_ts;
atomic_sub(msg->m_ts,&msg_bytes);
atomic_dec(&msg_hdrs);
ss_wakeup(&msq->q_senders,0);
msg_unlock(msqid);
out_success:
msgsz = (msgsz > msg->m_ts) ? msg->m_ts : msgsz;
if (put_user (msg->m_type, &msgp->mtype) ||
store_msg(msgp->mtext, msg, msgsz)) {
msgsz = -EFAULT;
}
free_msg(msg);
return msgsz; <- As soon as we get here the msr_d
structure is open to be overwritten
} else
{
struct msg_queue *t;
/* no message waiting. Prepare for pipelined
* receive.
*/
if (msgflg & IPC_NOWAIT) {
err=-ENOMSG;
goto out_unlock;
}
list_add_tail(&msr_d.r_list,&msq->q_receivers); <- We're now
including this stack based queue element on a queue used by other IPC
routines
msr_d.r_tsk = current;
msr_d.r_msgtype = msgtyp;
msr_d.r_mode = mode;
if(msgflg & MSG_NOERROR)
msr_d.r_maxsize = INT_MAX;
else
msr_d.r_maxsize = msgsz;
msr_d.r_msg = ERR_PTR(-EAGAIN);
current->state = TASK_INTERRUPTIBLE;
msg_unlock(msqid);
schedule(); <- Our msr_d structure is protected even if
we schedule other processes as our stack is still ours
current->state = TASK_RUNNING;
msg = (struct msg_msg*) msr_d.r_msg;
if(!IS_ERR(msg))
goto out_success;
t = msg_lock(msqid);
if(t==NULL)
msqid=-1;
msg = (struct msg_msg*)msr_d.r_msg;
if(!IS_ERR(msg)) {
/* our message arived while we waited for
* the spinlock. Process it.
*/
if(msqid!=-1)
msg_unlock(msqid);
goto out_success;
}
err = PTR_ERR(msg);
if(err == -EAGAIN) {
if(msqid==-1)
BUG();
list_del(&msr_d.r_list);
if (signal_pending(current))
err=-EINTR;
else
goto retry;
}
}
out_unlock:
if(msqid!=-1)
msg_unlock(msqid);
return err;
}
int inline pipelined_send(struct msg_queue* msq, struct msg_msg* msg)
{
struct list_head* tmp;
tmp = msq->q_receivers.next; <- We pull our element off the queue
here
while (tmp != &msq->q_receivers) {
struct msg_receiver* msr;
msr = list_entry(tmp,struct msg_receiver,r_list);
tmp = tmp->next;
if(testmsg(msg,msr->r_msgtype,msr->r_mode)) {
list_del(&msr->r_list);
if(msr->r_maxsize < msg->m_ts) {
msr->r_msg = ERR_PTR(-E2BIG);
wake_up_process(msr->r_tsk); <- We pass a
NULL pointer to wake_up_process and Oops
} else {
msr->r_msg = msg;
msq->q_lrpid = msr->r_tsk->pid;
msq->q_rtime = CURRENT_TIME;
wake_up_process(msr->r_tsk);
return 1;
}
}
}
return 0;
}
Neale Ferguson