Jay,

If I understand this correctly, then I think there is a flaw in this type
of random round robin. I think I can best describe it with an example.

Assume you have 4 servers A, B, C, D, and that server C is down. When your
random seed hits server A, B or D (with probability 1/4, respectively) the
message will go through fine. When your random seeds hits server C (again
with p=1/4), you will realize that it is down and proceed to D. So in
effect you will hit servers A and B with probability 1/4 each, and server D
with probability 1/2. This is not very good load balancing, because
basically the load of the server that's down is inherited by the next
server, which is not what you set out to accomplish.

With simple, non-random round-robin, when ever you hit server C, you will
skip it and go to server D. The next message will go to server A, the next
to B, etc... Thus, the probability of hitting server D is 1/3 (as well as A
and B).

All this rests on my interpretation of what you have done, since I am not
familiar with the code.

JES

Austad, Jay writes:

> Ok, I fixed it.  It now just rotates the serverindex.pos[] by a random
> amount, and then loops through until it finds a good server. rand() is still
> seeded with milliseconds from the system clock.  I used memcpy() to move the
> array around instead of loops to make it more efficient.
> 
> Would it be possible to run qmqpc as some sort of daemon?  Then it could
> have memory of which server it contacted last and make sure it went to the
> next one, instead of having the chance of picking the same one again.
> 
> Jay
> 
> 
> 
> #include <sys/types.h>
> #include <string.h>
> #include <memory.h>
> #include <sys/socket.h>
> #include <stddef.h>
> #include <netinet/in.h>
> #include <arpa/inet.h>
> #include <stdlib.h>
> #include <sys/time.h>
> #include <sys/timeb.h>
> #include "substdio.h"
> #include "getln.h"
> #include "readwrite.h"
> #include "exit.h"
> #include "stralloc.h"
> #include "slurpclose.h"
> #include "error.h"
> #include "sig.h"
> #include "ip.h"
> #include "timeoutconn.h"
> #include "timeoutread.h"
> #include "timeoutwrite.h"
> #include "auto_qmail.h"
> #include "control.h"
> #include "fmt.h"
> 
> #define PORT_QMQP 628
> 
> void die_success() { _exit(0); }
> void die_perm() { _exit(31); }
> void nomem() { _exit(51); }
> void die_read() { if (errno == error_nomem) nomem(); _exit(54); }
> void die_control() { _exit(55); }
> void die_socket() { _exit(56); }
> void die_home() { _exit(61); }
> void die_temp() { _exit(71); }
> void die_conn() { _exit(74); }
> void die_format() { _exit(91); }
> 
> int lasterror = 55;
> int qmqpfd;
> 
> int saferead(fd,buf,len) int fd; char *buf; int len;
> {
>   int r;
>   r = timeoutread(60,qmqpfd,buf,len);
>   if (r <= 0) die_conn();
>   return r;
> }
> int safewrite(fd,buf,len) int fd; char *buf; int len;
> {
>   int r;
>   r = timeoutwrite(60,qmqpfd,buf,len);
>   if (r <= 0) die_conn();
>   return r;
> }
> 
> char buf[1024];
> substdio to = SUBSTDIO_FDBUF(safewrite,-1,buf,sizeof buf);
> substdio from = SUBSTDIO_FDBUF(saferead,-1,buf,sizeof buf);
> substdio envelope = SUBSTDIO_FDBUF(read,1,buf,sizeof buf);
> /* WARNING: can use only one of these at a time! */
> 
> stralloc beforemessage = {0};
> stralloc message = {0};
> stralloc aftermessage = {0};
> 
> char strnum[FMT_ULONG];
> stralloc line = {0};
> 
> struct sindex
> {
>               int pos[256];
>               int len;
> };
> 
> void getmess()
> {
>   int match;
> 
>   if (slurpclose(0,&message,1024) == -1) die_read();
> 
>   strnum[fmt_ulong(strnum,(unsigned long) message.len)] = 0;
>   if (!stralloc_copys(&beforemessage,strnum)) nomem();
>   if (!stralloc_cats(&beforemessage,":")) nomem();
>   if (!stralloc_copys(&aftermessage,",")) nomem();
> 
>   if (getln(&envelope,&line,&match,'\0') == -1) die_read();
>   if (!match) die_format();
>   if (line.len < 2) die_format();
>   if (line.s[0] != 'F') die_format();
> 
>   strnum[fmt_ulong(strnum,(unsigned long) line.len - 2)] = 0;
>   if (!stralloc_cats(&aftermessage,strnum)) nomem();
>   if (!stralloc_cats(&aftermessage,":")) nomem();
>   if (!stralloc_catb(&aftermessage,line.s + 1,line.len - 2)) nomem();
>   if (!stralloc_cats(&aftermessage,",")) nomem();
> 
>   for (;;) {
>     if (getln(&envelope,&line,&match,'\0') == -1) die_read();
>     if (!match) die_format();
>     if (line.len < 2) break;
>     if (line.s[0] != 'T') die_format();
> 
>     strnum[fmt_ulong(strnum,(unsigned long) line.len - 2)] = 0;
>     if (!stralloc_cats(&aftermessage,strnum)) nomem();
>     if (!stralloc_cats(&aftermessage,":")) nomem();
>     if (!stralloc_catb(&aftermessage,line.s + 1,line.len - 2)) nomem();
>     if (!stralloc_cats(&aftermessage,",")) nomem();
>   }
> }
> 
> void doit(server)
> char *server;
> {
>   struct ip_address ip;
>   char ch;
> 
>   if (!ip_scan(server,&ip)) return;
> 
>   qmqpfd = socket(AF_INET,SOCK_STREAM,0);
>   if (qmqpfd == -1) die_socket();
> 
>   if (timeoutconn(qmqpfd,&ip,PORT_QMQP,10) != 0) {
>     lasterror = 73;
>     if (errno == error_timeout) lasterror = 72;
>     close(qmqpfd);
>     return;
>   }
> 
>   strnum[fmt_ulong(strnum,(unsigned long) (beforemessage.len + message.len +
> aftermessage.len))] = 0;
>   substdio_puts(&to,strnum);
>   substdio_puts(&to,":");
>   substdio_put(&to,beforemessage.s,beforemessage.len);
>   substdio_put(&to,message.s,message.len);
>   substdio_put(&to,aftermessage.s,aftermessage.len);
>   substdio_puts(&to,",");
>   substdio_flush(&to);
> 
>   for (;;) {
>     substdio_get(&from,&ch,1);
>     if (ch == 'K') die_success();
>     if (ch == 'Z') die_temp();
>     if (ch == 'D') die_perm();
>   }
> }
> 
> stralloc servers = {0};
> 
> main()
> {
>   int i;
>   int j;
>   int randj;
>   int randarr[256];
>   struct timeb tp;
>   struct sindex serverindex;
>   
>   sig_pipeignore();
> 
>   if (chdir(auto_qmail) == -1) die_home();
>   if (control_init() == -1) die_control();
>   if (control_readfile(&servers,"control/qmqpservers",0) != 1)
> die_control();
> 
>   getmess();
>   serverindex.len = 1;
>   serverindex.pos[0]=0;
>   for (j = 0; j < servers.len; j++)
>   {
>                 if (servers.s[j] == NULL) {
>                                 serverindex.pos[serverindex.len] = j+1;
>                                 serverindex.len++;
>                 }
>   }
>   serverindex.len--;
>   ftime(&tp);
>   srand(tp.millitm);
>   randj = (serverindex.len*1.0)*rand()/(RAND_MAX+1.0);
> 
>   memcpy(randarr, serverindex.pos + randj, 4 * (serverindex.len - randj));
>   memcpy(randarr + (serverindex.len-randj), serverindex.pos, (randj) * 4);
> 
> for (j=0; j < serverindex.len; j++)
>   {
>         doit(servers.s + randarr[j]);
>   }
>                                                 
> 
>   _exit(lasterror);
> }
> 



Reply via email to