You are correct.  However, since qmail-qmqpc does not continually run, it is
not possible to do a non-random round robin (as far as I know).  If qmqpc
could be turned into a process that ran continuously, it would be possible
to do regular round-robin and there would be no need for the random stuff at
all, this may also increase performance because it wouldn't have to read
/var/qmail/control/qmqpservers every time it started.  I haven't looked at
it, but does qmail-inject work by just piping it's data into qmail-queue
(qmail-queue is a link to qmail-qmqpc in a mini-qmail setup)?  If so,
couldn't qmail-queue be turned into a named pipe, and qmqpc could be
modified slightly to continously run and wait for input on that named pipe?
Just a random thought.  Of course, I'm not a programmer and I may not have
any idea what the hell I'm talking about. :)

This approach that I'm using obviously isn't going to spread the load
between servers exactly evenly, but it should be close enough when mailing
large amounts of mail, unless one qmqp server dies.  However, our qmqp
servers have port 628 for QMQP monitored closely and I'll get paged within a
couple of minutes if it goes down, so I'm not too worried about it.  I guess
I could redo it so instead of just rotating the array by a random number, it
would shuffle it it randomly.  That would keep it from banging on the server
immediately after the dead one, and give everything an even chance again.

Jay

-----Original Message-----
From: JuanE [mailto:[EMAIL PROTECTED]]
Sent: Tuesday, August 01, 2000 6:57 PM
To: [EMAIL PROTECTED]
Subject: Re: updated load balancing qmail-qmqpc.c mods



Jay,

If I understand this correctly, then I think there is a flaw in this type
of random round robin. I think I can best describe it with an example.

Assume you have 4 servers A, B, C, D, and that server C is down. When your
random seed hits server A, B or D (with probability 1/4, respectively) the
message will go through fine. When your random seeds hits server C (again
with p=1/4), you will realize that it is down and proceed to D. So in
effect you will hit servers A and B with probability 1/4 each, and server D
with probability 1/2. This is not very good load balancing, because
basically the load of the server that's down is inherited by the next
server, which is not what you set out to accomplish.

With simple, non-random round-robin, when ever you hit server C, you will
skip it and go to server D. The next message will go to server A, the next
to B, etc... Thus, the probability of hitting server D is 1/3 (as well as A
and B).

All this rests on my interpretation of what you have done, since I am not
familiar with the code.

JES

Austad, Jay writes:

> Ok, I fixed it.  It now just rotates the serverindex.pos[] by a random
> amount, and then loops through until it finds a good server. rand() is
still
> seeded with milliseconds from the system clock.  I used memcpy() to move
the
> array around instead of loops to make it more efficient.
> 
> Would it be possible to run qmqpc as some sort of daemon?  Then it could
> have memory of which server it contacted last and make sure it went to the
> next one, instead of having the chance of picking the same one again.
> 
> Jay
> 
> 
> 
> #include <sys/types.h>
> #include <string.h>
> #include <memory.h>
> #include <sys/socket.h>
> #include <stddef.h>
> #include <netinet/in.h>
> #include <arpa/inet.h>
> #include <stdlib.h>
> #include <sys/time.h>
> #include <sys/timeb.h>
> #include "substdio.h"
> #include "getln.h"
> #include "readwrite.h"
> #include "exit.h"
> #include "stralloc.h"
> #include "slurpclose.h"
> #include "error.h"
> #include "sig.h"
> #include "ip.h"
> #include "timeoutconn.h"
> #include "timeoutread.h"
> #include "timeoutwrite.h"
> #include "auto_qmail.h"
> #include "control.h"
> #include "fmt.h"
> 
> #define PORT_QMQP 628
> 
> void die_success() { _exit(0); }
> void die_perm() { _exit(31); }
> void nomem() { _exit(51); }
> void die_read() { if (errno == error_nomem) nomem(); _exit(54); }
> void die_control() { _exit(55); }
> void die_socket() { _exit(56); }
> void die_home() { _exit(61); }
> void die_temp() { _exit(71); }
> void die_conn() { _exit(74); }
> void die_format() { _exit(91); }
> 
> int lasterror = 55;
> int qmqpfd;
> 
> int saferead(fd,buf,len) int fd; char *buf; int len;
> {
>   int r;
>   r = timeoutread(60,qmqpfd,buf,len);
>   if (r <= 0) die_conn();
>   return r;
> }
> int safewrite(fd,buf,len) int fd; char *buf; int len;
> {
>   int r;
>   r = timeoutwrite(60,qmqpfd,buf,len);
>   if (r <= 0) die_conn();
>   return r;
> }
> 
> char buf[1024];
> substdio to = SUBSTDIO_FDBUF(safewrite,-1,buf,sizeof buf);
> substdio from = SUBSTDIO_FDBUF(saferead,-1,buf,sizeof buf);
> substdio envelope = SUBSTDIO_FDBUF(read,1,buf,sizeof buf);
> /* WARNING: can use only one of these at a time! */
> 
> stralloc beforemessage = {0};
> stralloc message = {0};
> stralloc aftermessage = {0};
> 
> char strnum[FMT_ULONG];
> stralloc line = {0};
> 
> struct sindex
> {
>               int pos[256];
>               int len;
> };
> 
> void getmess()
> {
>   int match;
> 
>   if (slurpclose(0,&message,1024) == -1) die_read();
> 
>   strnum[fmt_ulong(strnum,(unsigned long) message.len)] = 0;
>   if (!stralloc_copys(&beforemessage,strnum)) nomem();
>   if (!stralloc_cats(&beforemessage,":")) nomem();
>   if (!stralloc_copys(&aftermessage,",")) nomem();
> 
>   if (getln(&envelope,&line,&match,'\0') == -1) die_read();
>   if (!match) die_format();
>   if (line.len < 2) die_format();
>   if (line.s[0] != 'F') die_format();
> 
>   strnum[fmt_ulong(strnum,(unsigned long) line.len - 2)] = 0;
>   if (!stralloc_cats(&aftermessage,strnum)) nomem();
>   if (!stralloc_cats(&aftermessage,":")) nomem();
>   if (!stralloc_catb(&aftermessage,line.s + 1,line.len - 2)) nomem();
>   if (!stralloc_cats(&aftermessage,",")) nomem();
> 
>   for (;;) {
>     if (getln(&envelope,&line,&match,'\0') == -1) die_read();
>     if (!match) die_format();
>     if (line.len < 2) break;
>     if (line.s[0] != 'T') die_format();
> 
>     strnum[fmt_ulong(strnum,(unsigned long) line.len - 2)] = 0;
>     if (!stralloc_cats(&aftermessage,strnum)) nomem();
>     if (!stralloc_cats(&aftermessage,":")) nomem();
>     if (!stralloc_catb(&aftermessage,line.s + 1,line.len - 2)) nomem();
>     if (!stralloc_cats(&aftermessage,",")) nomem();
>   }
> }
> 
> void doit(server)
> char *server;
> {
>   struct ip_address ip;
>   char ch;
> 
>   if (!ip_scan(server,&ip)) return;
> 
>   qmqpfd = socket(AF_INET,SOCK_STREAM,0);
>   if (qmqpfd == -1) die_socket();
> 
>   if (timeoutconn(qmqpfd,&ip,PORT_QMQP,10) != 0) {
>     lasterror = 73;
>     if (errno == error_timeout) lasterror = 72;
>     close(qmqpfd);
>     return;
>   }
> 
>   strnum[fmt_ulong(strnum,(unsigned long) (beforemessage.len + message.len
+
> aftermessage.len))] = 0;
>   substdio_puts(&to,strnum);
>   substdio_puts(&to,":");
>   substdio_put(&to,beforemessage.s,beforemessage.len);
>   substdio_put(&to,message.s,message.len);
>   substdio_put(&to,aftermessage.s,aftermessage.len);
>   substdio_puts(&to,",");
>   substdio_flush(&to);
> 
>   for (;;) {
>     substdio_get(&from,&ch,1);
>     if (ch == 'K') die_success();
>     if (ch == 'Z') die_temp();
>     if (ch == 'D') die_perm();
>   }
> }
> 
> stralloc servers = {0};
> 
> main()
> {
>   int i;
>   int j;
>   int randj;
>   int randarr[256];
>   struct timeb tp;
>   struct sindex serverindex;
>   
>   sig_pipeignore();
> 
>   if (chdir(auto_qmail) == -1) die_home();
>   if (control_init() == -1) die_control();
>   if (control_readfile(&servers,"control/qmqpservers",0) != 1)
> die_control();
> 
>   getmess();
>   serverindex.len = 1;
>   serverindex.pos[0]=0;
>   for (j = 0; j < servers.len; j++)
>   {
>                 if (servers.s[j] == NULL) {
>                                 serverindex.pos[serverindex.len] = j+1;
>                                 serverindex.len++;
>                 }
>   }
>   serverindex.len--;
>   ftime(&tp);
>   srand(tp.millitm);
>   randj = (serverindex.len*1.0)*rand()/(RAND_MAX+1.0);
> 
>   memcpy(randarr, serverindex.pos + randj, 4 * (serverindex.len - randj));
>   memcpy(randarr + (serverindex.len-randj), serverindex.pos, (randj) * 4);
> 
> for (j=0; j < serverindex.len; j++)
>   {
>         doit(servers.s + randarr[j]);
>   }
>                                                 
> 
>   _exit(lasterror);
> }
> 


Reply via email to