costin      02/04/25 12:21:58

  Modified:    jk/native2/common jk_worker_lb.c
  Log:
  Added some code to check the shm 'version' and update the config.
  
  Not completed.
  
  Revision  Changes    Path
  1.4       +173 -97   jakarta-tomcat-connectors/jk/native2/common/jk_worker_lb.c
  
  Index: jk_worker_lb.c
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-connectors/jk/native2/common/jk_worker_lb.c,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- jk_worker_lb.c    15 Apr 2002 23:47:57 -0000      1.3
  +++ jk_worker_lb.c    25 Apr 2002 19:21:58 -0000      1.4
  @@ -60,7 +60,7 @@
    *              several workers.                                           *
    * Author:      Gal Shachor <[EMAIL PROTECTED]>                           *
    * Based on:                                                               *
  - * Version:     $Revision: 1.3 $                                           *
  + * Version:     $Revision: 1.4 $                                           *
    ***************************************************************************/
   
   #include "jk_pool.h"
  @@ -70,6 +70,7 @@
   #include "jk_config.h"
   #include "jk_env.h"
   #include "jk_requtil.h"
  +#include "jk_mt.h"
   
   #define DEFAULT_LB_FACTOR           (1.0)
   
  @@ -83,15 +84,15 @@
    * This + ADDITIONAL_WAIT_LOAD will be set on all the workers
    * that recover after an error.
    */
  -static double jk2_get_max_lb(jk_worker_t *p) 
  +static double jk2_get_max_lb(jk_worker_t *lb) 
   {
       int i;
       double rc = 0.0;    
   
  -    for(i = 0 ; i < p->num_of_workers ; i++) {
  -        if(!p->lb_workers[i]->in_error_state) {
  -            if(p->lb_workers[i]->lb_value > rc) {
  -                rc = p->lb_workers[i]->lb_value;
  +    for(i = 0 ; i < lb->num_of_workers ; i++) {
  +        if(!lb->lb_workers[i]->in_error_state) {
  +            if(lb->lb_workers[i]->lb_value > rc) {
  +                rc = lb->lb_workers[i]->lb_value;
               }
           }            
       }
  @@ -106,7 +107,7 @@
   
       It'll also adjust the load balancing factors.
   */
  -static jk_worker_t *jk2_get_most_suitable_worker(jk_env_t *env, jk_worker_t *p, 
  +static jk_worker_t *jk2_get_most_suitable_worker(jk_env_t *env, jk_worker_t *lb, 
                                                    jk_ws_service_t *s, int attempt)
   {
       jk_worker_t *rc = NULL;
  @@ -117,37 +118,37 @@
       session_route = jk2_requtil_getSessionRoute(env, s);
          
       if(session_route) {
  -        for(i = 0 ; i < p->num_of_workers ; i++) {
  -            if(0 == strcmp(session_route, p->lb_workers[i]->mbean->name)) {
  -                if(attempt > 0 && p->lb_workers[i]->in_error_state) {
  +        for(i = 0 ; i < lb->num_of_workers ; i++) {
  +            if(0 == strcmp(session_route, lb->lb_workers[i]->mbean->name)) {
  +                if(attempt > 0 && lb->lb_workers[i]->in_error_state) {
                      break;
                   } else {
  -                    return p->lb_workers[i];
  +                    return lb->lb_workers[i];
                   }
               }
           }
       }
   
       /** Get one worker that is ready */
  -    for(i = 0 ; i < p->num_of_workers ; i++) {
  -        if(p->lb_workers[i]->in_error_state) {
  -            if(!p->lb_workers[i]->in_recovering) {
  +    for(i = 0 ; i < lb->num_of_workers ; i++) {
  +        if(lb->lb_workers[i]->in_error_state) {
  +            if(!lb->lb_workers[i]->in_recovering) {
                   time_t now = time(0);
                   
  -                if((now - p->lb_workers[i]->error_time) > WAIT_BEFORE_RECOVER) {
  +                if((now - lb->lb_workers[i]->error_time) > WAIT_BEFORE_RECOVER) {
                       
  -                    p->lb_workers[i]->in_recovering  = JK_TRUE;
  -                    p->lb_workers[i]->error_time     = now;
  -                    p->lb_workers[i]->retry_count++;
  -                    rc = p->lb_workers[i];
  +                    lb->lb_workers[i]->in_recovering  = JK_TRUE;
  +                    lb->lb_workers[i]->error_time     = now;
  +                    lb->lb_workers[i]->retry_count++;
  +                    rc = lb->lb_workers[i];
   
                       break;
                   }
               }
           } else {
  -            if(p->lb_workers[i]->lb_value < lb_min || !rc) {
  -                lb_min = p->lb_workers[i]->lb_value;
  -                rc = p->lb_workers[i];
  +            if(lb->lb_workers[i]->lb_value < lb_min || !rc) {
  +                lb_min = lb->lb_workers[i]->lb_value;
  +                rc = lb->lb_workers[i];
               }
           }            
       }
  @@ -155,29 +156,29 @@
       if ( !rc ) {
           /* no workers found (rc is null), now try as hard as possible to get a
              worker anyway, pick one with largest error time.. */
  -        for(i = 0 ; i < p->num_of_workers ; i++) {
  -            if(p->lb_workers[i]->in_error_state) {
  -                if(!p->lb_workers[i]->in_recovering) {
  +        for(i = 0 ; i < lb->num_of_workers ; i++) {
  +            if(lb->lb_workers[i]->in_error_state) {
  +                if(!lb->lb_workers[i]->in_recovering) {
                       /* if the retry count is zero, that means the worker only
                          failed once, this is to e that the failed worker will
                          not continue to be retried over and over again.
                       */
  -                    if ( p->lb_workers[i]->retry_count == 0 ) {
  +                    if ( lb->lb_workers[i]->retry_count == 0 ) {
                           if ( rc ) {
                               /* pick the oldest failed worker */
  -                            if ( p->lb_workers[i]->error_time < rc->error_time ) {
  -                                rc = p->lb_workers[i];
  +                            if ( lb->lb_workers[i]->error_time < rc->error_time ) {
  +                                rc = lb->lb_workers[i];
                               }
                           } else {
  -                            rc = p->lb_workers[i];
  +                            rc = lb->lb_workers[i];
                           }
                       }
                   }
               } else {
                   /* This is a good worker - it may have come to life */
  -                if(p->lb_workers[i]->lb_value < lb_min || rc != NULL) {
  -                    lb_min = p->lb_workers[i]->lb_value;
  -                    rc = p->lb_workers[i];
  +                if(lb->lb_workers[i]->lb_value < lb_min || rc != NULL) {
  +                    lb_min = lb->lb_workers[i]->lb_value;
  +                    rc = lb->lb_workers[i];
                       break;
                   }
               }
  @@ -198,43 +199,104 @@
       return rc;
   }
   
  +/** Check the scoreboard, make updates in the 'live'
  +    config
  +*/
  +static int JK_METHOD jk2_lb_updateWorkers(jk_env_t *env,
  +                                          jk_worker_t *lb,
  +                                          jk_shm_t *shm)
  +{
  +    int rc;
  +    int i;
  +
  +    if( shm== NULL || shm->head==NULL) return JK_ERR;
  +    
  +    JK_ENTER_CS(&lb->cs, rc);
  +    if(rc !=JK_TRUE) {
  +        env->l->jkLog(env, env->l, JK_LOG_ERROR,
  +                      "lb.updateWorkers() Can't enter critical section\n");
  +        return JK_ERR;
  +    }
  +    if( lb->ver == shm->head->lbVer ) {
  +        /* Was updated by some other thread */
  +        return JK_OK;
  +    }
  +
  +    /* Walk the shm and update any changed worker */
  +    env->l->jkLog(env, env->l, JK_LOG_INFO,
  +                  "lb.updateWorkers() Updating workers %d %d\n",
  +                  lb->ver, shm->head->lbVer);
  +    for( i=1; i<shm->head->lastSlot; i++ ) {
  +        jk_shm_slot_t *slot= shm->getSlot( env, shm, i );
  +        if( strncmp( slot->name, "TOMCAT:", 7 ) == 0 ) {
  +            /* */
  +            char *instanceId=slot->name+7;
  +            char *data=slot->data;
  +
  +            
  +        }
  +    }
  +
  +    lb->ver = shm->head->lbVer;
  +        
  +    JK_LEAVE_CS(&lb->cs, rc);
  +    return JK_OK;
  +}
  +
   
   /** Get the best worker and forward to it.
       Since we don't directly connect to anything, there's no
       need for an endpoint.
   */
   static int JK_METHOD jk2_lb_service(jk_env_t *env,
  -                                    jk_worker_t *w,
  +                                    jk_worker_t *lb,
                                       jk_ws_service_t *s)
   {
       int attempt=0;
       int i;
  -
  +    jk_workerEnv_t *wEnv=lb->workerEnv;
  +    
       if( s==NULL ) {
           env->l->jkLog(env, env->l, JK_LOG_ERROR,
                         "lb.service() NullPointerException\n");
  -        return JK_FALSE;
  +        return JK_ERR;
       }
   
       /* you can not recover on another load balancer */
       s->realWorker=NULL;
   
  -       /* reset all the retry counts to 0 */
  -       for(i = 0 ; i < w->num_of_workers ; i++) {
  -           w->lb_workers[i]->retry_count = 0;
  -       }
  -
  +    /* reset all the retry counts to 0. XXX may be a problem if we have many 
workers ? */
  +    for(i = 0 ; i < lb->num_of_workers ; i++) {
  +        lb->lb_workers[i]->retry_count = 0;
  +    }
   
  +    if( wEnv->shm != NULL && wEnv->shm->head != NULL ) {
  +        /* We have shm, let's check for updates. This is just checking one
  +           memory location, no lock involved. It is possible we'll read it
  +           while somebody else is changing - but that's ok, we just check for
  +           equality.
  +        */
  +        if( lb->ver != wEnv->shm->head->lbVer ) {
  +            jk2_lb_updateWorkers(env, lb, wEnv->shm);
  +        }
  +    }
       
       while(1) {
           jk_worker_t *rec;
           int rc;
   
  -        if( w->num_of_workers==1 ) {
  +        /* Prevent loops */
  +        if( attempt > lb->num_of_workers + 1 ) {
  +            env->l->jkLog(env, env->l, JK_LOG_ERROR,
  +                          "lb.service() max attempts exceeded %d\n", attempt);
  +            return JK_ERR;
  +        }
  +        
  +        if( lb->num_of_workers==1 ) {
               /* A single worker - no need to search */
  -            rec=w->lb_workers[0];
  +            rec=lb->lb_workers[0];
           } else {
  -            rec=jk2_get_most_suitable_worker(env, w, s, attempt++);
  +            rec=jk2_get_most_suitable_worker(env, lb, s, attempt++);
           }
           
           s->is_recoverable_error = JK_FALSE;
  @@ -243,7 +305,7 @@
               /* NULL record, no more workers left ... */
               env->l->jkLog(env, env->l, JK_LOG_ERROR, 
                             "lb_worker.service() No suitable workers left \n");
  -            return JK_FALSE;
  +            return JK_ERR;
           }
                   
           env->l->jkLog(env, env->l, JK_LOG_INFO,
  @@ -253,7 +315,7 @@
   
           rc = rec->service(env, rec, s);
   
  -        if(rc==JK_TRUE) {                        
  +        if(rc==JK_OK) {                        
               if(rec->in_recovering) {
                   rec->lb_value = jk2_get_max_lb(rec) + ADDITINAL_WAIT_LOAD;
               }
  @@ -263,7 +325,7 @@
               rec->error_time     = 0;
               /* the endpoint that succeeded is saved for done() */
               s->realWorker = rec;
  -            return JK_TRUE;
  +            return JK_OK;
           }
           
           /*
  @@ -289,51 +351,51 @@
           env->l->jkLog(env, env->l, JK_LOG_INFO, 
                         "lb_worker.service() try other host\n");
       }
  -    return JK_FALSE;
  +    return JK_ERR;
   }
   
   /** Init internal structures.
       Called any time the config changes
   */
  -static int JK_METHOD jk2_lb_initLbArray(jk_env_t *env, jk_worker_t *_this)
  +static int JK_METHOD jk2_lb_refresh(jk_env_t *env, jk_worker_t *lb)
   {
       int currentWorker=0;
       int i;
  -    _this->num_of_workers=_this->lbWorkerMap->size( env, _this->lbWorkerMap);
  +    lb->num_of_workers=lb->lbWorkerMap->size( env, lb->lbWorkerMap);
   
  -    if( _this->lb_workers_size < _this->num_of_workers ) {
  -        if( _this->lb_workers_size==0 ) {
  -            _this->lb_workers_size=10;
  +    if( lb->lb_workers_size < lb->num_of_workers ) {
  +        if( lb->lb_workers_size==0 ) {
  +            lb->lb_workers_size=10;
           } else {
  -            _this->lb_workers_size = 2 * _this->lb_workers_size;
  +            lb->lb_workers_size = 2 * lb->lb_workers_size;
           }
  -        _this->lb_workers =
  -            _this->pool->alloc(env, _this->pool, 
  -                               _this->lb_workers_size * sizeof(jk_worker_t *));
  -        if(!_this->lb_workers) {
  +        lb->lb_workers =
  +            lb->pool->alloc(env, lb->pool, 
  +                               lb->lb_workers_size * sizeof(jk_worker_t *));
  +        if(!lb->lb_workers) {
               env->l->jkLog(env, env->l, JK_LOG_ERROR,
                             "lb_worker.validate(): OutOfMemoryException\n");
  -            return JK_FALSE;
  +            return JK_ERR;
           }
       }    
   
  -    for(i = 0 ; i < _this->num_of_workers ; i++) {
  -        char *name = _this->lbWorkerMap->nameAt( env, _this->lbWorkerMap, i);
  +    for(i = 0 ; i < lb->num_of_workers ; i++) {
  +        char *name = lb->lbWorkerMap->nameAt( env, lb->lbWorkerMap, i);
           jk_worker_t *w= env->getByName( env, name );
           if( w== NULL ) {
               env->l->jkLog(env, env->l, JK_LOG_ERROR,
                             "lb_worker.init(): no worker found %s\n", name);
  -           _this->num_of_workers--;
  +           lb->num_of_workers--;
               continue;
           }
           
  -        _this->lb_workers[currentWorker]=w;
  +        lb->lb_workers[currentWorker]=w;
   
  -        if( _this->lb_workers[currentWorker]->lb_factor == 0 )
  -            _this->lb_workers[currentWorker]->lb_factor = DEFAULT_LB_FACTOR;
  +        if( w->lb_factor == 0 )
  +            w->lb_factor = DEFAULT_LB_FACTOR;
           
  -        _this->lb_workers[currentWorker]->lb_factor =
  -            1/ _this->lb_workers[currentWorker]->lb_factor;
  +        w->lb_factor =
  +            1/ w->lb_factor;
   
           /* 
            * Allow using lb in fault-tolerant mode.
  @@ -341,69 +403,83 @@
            * a worker used only when principal is down or session route
            * point to it. Provided by Paul Frieden <[EMAIL PROTECTED]>
            */
  -        _this->lb_workers[currentWorker]->lb_value =
  -            _this->lb_workers[currentWorker]->lb_factor;
  -        _this->lb_workers[currentWorker]->in_error_state = JK_FALSE;
  -        _this->lb_workers[currentWorker]->in_recovering  = JK_FALSE;
  -        _this->lb_workers[currentWorker]->retry_count  = 0;
  +        w->lb_value =
  +            w->lb_factor;
  +        w->in_error_state = JK_FALSE;
  +        w->in_recovering  = JK_FALSE;
  +        w->retry_count  = 0;
   
           currentWorker++;
       }
  -    return JK_TRUE;
  +    return JK_OK;
  +}
  +
  +static int JK_METHOD jk2_lb_addWorker(jk_env_t *env, jk_worker_t *lb, 
  +                                      char *name)
  +{
  +    name = lb->pool->pstrdup(env, lb->pool, name);
  +    lb->lbWorkerMap->add(env, lb->lbWorkerMap, name, "");
  +    
  +    env->l->jkLog(env, env->l, JK_LOG_INFO,
  +                  "lb_worker.setAttribute(): Adding %s %s\n", lb->mbean->name, 
name);
  +
   }
   
   static int JK_METHOD jk2_lb_setProperty(jk_env_t *env, jk_bean_t *mbean, 
                                           char *name, void *valueP)
   {
  -    jk_worker_t *_this=mbean->object;
  +    jk_worker_t *lb=mbean->object;
       char *value=valueP;
       int err;
       char **worker_names;
       unsigned num_of_workers;
       unsigned i = 0;
       char *tmp;
  -
  -    /* XXX Add one-by-one */
       
       if( strcmp( name, "balanced_workers") == 0 ) {
  -        worker_names=jk2_config_split( env,  _this->pool,
  +        worker_names=jk2_config_split( env,  lb->pool,
                                          value, NULL, &num_of_workers );
           if( worker_names==NULL || num_of_workers==0 ) {
               env->l->jkLog(env, env->l, JK_LOG_ERROR,
                             "lb_worker.validate(): no defined workers\n");
  -            return JK_FALSE;
  +            return JK_ERR;
           }
           for(i = 0 ; i < num_of_workers ; i++) {
  -            char *name = _this->pool->pstrdup(env, _this->pool, worker_names[i]);
  -            _this->lbWorkerMap->add(env, _this->lbWorkerMap, name, "");
  -            env->l->jkLog(env, env->l, JK_LOG_INFO,
  -                          "lb_worker.setAttribute(): Adding %s %s\n", 
_this->mbean->name, name);
  +            jk2_lb_addWorker( env, lb, worker_names[i]);
           }
  -        jk2_lb_initLbArray( env, _this );
  -        return JK_TRUE;
  +        jk2_lb_refresh( env, lb );
  +        return JK_OK;
  +    } else if( strcmp( name, "worker") == 0 ) {
  +        jk2_lb_addWorker( env, lb, value);
  +        jk2_lb_refresh( env, lb );
  +        return JK_OK;
       }
  -    return JK_FALSE;
  +    return JK_ERR;
   }
   
   
  -static int JK_METHOD jk2_lb_init(jk_env_t *env, jk_worker_t *_this)
  +static int JK_METHOD jk2_lb_init(jk_env_t *env, jk_worker_t *lb)
   {
       int err;
       char **worker_names;
       int i = 0;
       char *tmp;
   
  -    int num_of_workers=_this->lbWorkerMap->size( env, _this->lbWorkerMap);
  +    int num_of_workers=lb->lbWorkerMap->size( env, lb->lbWorkerMap);
   
  -    err=jk2_lb_initLbArray(env, _this );
  -    if( err != JK_TRUE )
  +    err=jk2_lb_refresh(env, lb );
  +    if( err != JK_OK )
           return err;
  -        
  +
  +    lb->ver=0;
  +    if( lb->workerEnv->shm != NULL && lb->workerEnv->shm->head != NULL) 
  +        jk2_lb_updateWorkers(env, lb, lb->workerEnv->shm);
  +
       env->l->jkLog(env, env->l, JK_LOG_INFO,
                     "lb.init() %s %d workers\n",
  -                  _this->mbean->name, _this->num_of_workers );
  -
  -    return JK_TRUE;
  +                  lb->mbean->name, lb->num_of_workers );
  +    
  +    return JK_OK;
   }
   
   static int JK_METHOD jk2_lb_destroy(jk_env_t *env, jk_worker_t *w)
  @@ -413,7 +489,7 @@
       if(w==NULL ) {
           env->l->jkLog(env, env->l, JK_LOG_ERROR,
                         "lb_worker.destroy() NullPointerException\n");
  -        return JK_FALSE;
  +        return JK_ERR;
       }
   
       /* Workers are destroyed by the workerEnv. It is possible
  @@ -427,7 +503,7 @@
   
       w->pool->close(env, w->pool);    
   
  -    return JK_TRUE;
  +    return JK_OK;
   }
   
   
  @@ -439,7 +515,7 @@
       if(NULL == name ) {
           env->l->jkLog(env, env->l, JK_LOG_ERROR,
                         "lb_worker.factory() NullPointerException\n");
  -        return JK_FALSE;
  +        return JK_ERR;
       }
       
       w = (jk_worker_t *)pool->calloc(env, pool, sizeof(jk_worker_t));
  @@ -447,7 +523,7 @@
       if(w==NULL) {
           env->l->jkLog(env, env->l, JK_LOG_ERROR,
                         "lb_worker.factory() OutOfMemoryException\n");
  -        return JK_FALSE;
  +        return JK_ERR;
       }
   
       w->pool=pool;
  @@ -468,6 +544,6 @@
       w->workerEnv=env->getByName( env, "workerEnv" );
       w->workerEnv->addWorker( env, w->workerEnv, w );
       
  -    return JK_TRUE;
  +    return JK_OK;
   }
   
  
  
  

--
To unsubscribe, e-mail:   <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>

Reply via email to