costin 02/04/25 12:21:58 Modified: jk/native2/common jk_worker_lb.c Log: Added some code to check the shm 'version' and update the config. Not completed. Revision Changes Path 1.4 +173 -97 jakarta-tomcat-connectors/jk/native2/common/jk_worker_lb.c Index: jk_worker_lb.c =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/jk/native2/common/jk_worker_lb.c,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- jk_worker_lb.c 15 Apr 2002 23:47:57 -0000 1.3 +++ jk_worker_lb.c 25 Apr 2002 19:21:58 -0000 1.4 @@ -60,7 +60,7 @@ * several workers. * * Author: Gal Shachor <[EMAIL PROTECTED]> * * Based on: * - * Version: $Revision: 1.3 $ * + * Version: $Revision: 1.4 $ * ***************************************************************************/ #include "jk_pool.h" @@ -70,6 +70,7 @@ #include "jk_config.h" #include "jk_env.h" #include "jk_requtil.h" +#include "jk_mt.h" #define DEFAULT_LB_FACTOR (1.0) @@ -83,15 +84,15 @@ * This + ADDITIONAL_WAIT_LOAD will be set on all the workers * that recover after an error. */ -static double jk2_get_max_lb(jk_worker_t *p) +static double jk2_get_max_lb(jk_worker_t *lb) { int i; double rc = 0.0; - for(i = 0 ; i < p->num_of_workers ; i++) { - if(!p->lb_workers[i]->in_error_state) { - if(p->lb_workers[i]->lb_value > rc) { - rc = p->lb_workers[i]->lb_value; + for(i = 0 ; i < lb->num_of_workers ; i++) { + if(!lb->lb_workers[i]->in_error_state) { + if(lb->lb_workers[i]->lb_value > rc) { + rc = lb->lb_workers[i]->lb_value; } } } @@ -106,7 +107,7 @@ It'll also adjust the load balancing factors. */ -static jk_worker_t *jk2_get_most_suitable_worker(jk_env_t *env, jk_worker_t *p, +static jk_worker_t *jk2_get_most_suitable_worker(jk_env_t *env, jk_worker_t *lb, jk_ws_service_t *s, int attempt) { jk_worker_t *rc = NULL; @@ -117,37 +118,37 @@ session_route = jk2_requtil_getSessionRoute(env, s); if(session_route) { - for(i = 0 ; i < p->num_of_workers ; i++) { - if(0 == strcmp(session_route, p->lb_workers[i]->mbean->name)) { - if(attempt > 0 && p->lb_workers[i]->in_error_state) { + for(i = 0 ; i < lb->num_of_workers ; i++) { + if(0 == strcmp(session_route, lb->lb_workers[i]->mbean->name)) { + if(attempt > 0 && lb->lb_workers[i]->in_error_state) { break; } else { - return p->lb_workers[i]; + return lb->lb_workers[i]; } } } } /** Get one worker that is ready */ - for(i = 0 ; i < p->num_of_workers ; i++) { - if(p->lb_workers[i]->in_error_state) { - if(!p->lb_workers[i]->in_recovering) { + for(i = 0 ; i < lb->num_of_workers ; i++) { + if(lb->lb_workers[i]->in_error_state) { + if(!lb->lb_workers[i]->in_recovering) { time_t now = time(0); - if((now - p->lb_workers[i]->error_time) > WAIT_BEFORE_RECOVER) { + if((now - lb->lb_workers[i]->error_time) > WAIT_BEFORE_RECOVER) { - p->lb_workers[i]->in_recovering = JK_TRUE; - p->lb_workers[i]->error_time = now; - p->lb_workers[i]->retry_count++; - rc = p->lb_workers[i]; + lb->lb_workers[i]->in_recovering = JK_TRUE; + lb->lb_workers[i]->error_time = now; + lb->lb_workers[i]->retry_count++; + rc = lb->lb_workers[i]; break; } } } else { - if(p->lb_workers[i]->lb_value < lb_min || !rc) { - lb_min = p->lb_workers[i]->lb_value; - rc = p->lb_workers[i]; + if(lb->lb_workers[i]->lb_value < lb_min || !rc) { + lb_min = lb->lb_workers[i]->lb_value; + rc = lb->lb_workers[i]; } } } @@ -155,29 +156,29 @@ if ( !rc ) { /* no workers found (rc is null), now try as hard as possible to get a worker anyway, pick one with largest error time.. */ - for(i = 0 ; i < p->num_of_workers ; i++) { - if(p->lb_workers[i]->in_error_state) { - if(!p->lb_workers[i]->in_recovering) { + for(i = 0 ; i < lb->num_of_workers ; i++) { + if(lb->lb_workers[i]->in_error_state) { + if(!lb->lb_workers[i]->in_recovering) { /* if the retry count is zero, that means the worker only failed once, this is to e that the failed worker will not continue to be retried over and over again. */ - if ( p->lb_workers[i]->retry_count == 0 ) { + if ( lb->lb_workers[i]->retry_count == 0 ) { if ( rc ) { /* pick the oldest failed worker */ - if ( p->lb_workers[i]->error_time < rc->error_time ) { - rc = p->lb_workers[i]; + if ( lb->lb_workers[i]->error_time < rc->error_time ) { + rc = lb->lb_workers[i]; } } else { - rc = p->lb_workers[i]; + rc = lb->lb_workers[i]; } } } } else { /* This is a good worker - it may have come to life */ - if(p->lb_workers[i]->lb_value < lb_min || rc != NULL) { - lb_min = p->lb_workers[i]->lb_value; - rc = p->lb_workers[i]; + if(lb->lb_workers[i]->lb_value < lb_min || rc != NULL) { + lb_min = lb->lb_workers[i]->lb_value; + rc = lb->lb_workers[i]; break; } } @@ -198,43 +199,104 @@ return rc; } +/** Check the scoreboard, make updates in the 'live' + config +*/ +static int JK_METHOD jk2_lb_updateWorkers(jk_env_t *env, + jk_worker_t *lb, + jk_shm_t *shm) +{ + int rc; + int i; + + if( shm== NULL || shm->head==NULL) return JK_ERR; + + JK_ENTER_CS(&lb->cs, rc); + if(rc !=JK_TRUE) { + env->l->jkLog(env, env->l, JK_LOG_ERROR, + "lb.updateWorkers() Can't enter critical section\n"); + return JK_ERR; + } + if( lb->ver == shm->head->lbVer ) { + /* Was updated by some other thread */ + return JK_OK; + } + + /* Walk the shm and update any changed worker */ + env->l->jkLog(env, env->l, JK_LOG_INFO, + "lb.updateWorkers() Updating workers %d %d\n", + lb->ver, shm->head->lbVer); + for( i=1; i<shm->head->lastSlot; i++ ) { + jk_shm_slot_t *slot= shm->getSlot( env, shm, i ); + if( strncmp( slot->name, "TOMCAT:", 7 ) == 0 ) { + /* */ + char *instanceId=slot->name+7; + char *data=slot->data; + + + } + } + + lb->ver = shm->head->lbVer; + + JK_LEAVE_CS(&lb->cs, rc); + return JK_OK; +} + /** Get the best worker and forward to it. Since we don't directly connect to anything, there's no need for an endpoint. */ static int JK_METHOD jk2_lb_service(jk_env_t *env, - jk_worker_t *w, + jk_worker_t *lb, jk_ws_service_t *s) { int attempt=0; int i; - + jk_workerEnv_t *wEnv=lb->workerEnv; + if( s==NULL ) { env->l->jkLog(env, env->l, JK_LOG_ERROR, "lb.service() NullPointerException\n"); - return JK_FALSE; + return JK_ERR; } /* you can not recover on another load balancer */ s->realWorker=NULL; - /* reset all the retry counts to 0 */ - for(i = 0 ; i < w->num_of_workers ; i++) { - w->lb_workers[i]->retry_count = 0; - } - + /* reset all the retry counts to 0. XXX may be a problem if we have many workers ? */ + for(i = 0 ; i < lb->num_of_workers ; i++) { + lb->lb_workers[i]->retry_count = 0; + } + if( wEnv->shm != NULL && wEnv->shm->head != NULL ) { + /* We have shm, let's check for updates. This is just checking one + memory location, no lock involved. It is possible we'll read it + while somebody else is changing - but that's ok, we just check for + equality. + */ + if( lb->ver != wEnv->shm->head->lbVer ) { + jk2_lb_updateWorkers(env, lb, wEnv->shm); + } + } while(1) { jk_worker_t *rec; int rc; - if( w->num_of_workers==1 ) { + /* Prevent loops */ + if( attempt > lb->num_of_workers + 1 ) { + env->l->jkLog(env, env->l, JK_LOG_ERROR, + "lb.service() max attempts exceeded %d\n", attempt); + return JK_ERR; + } + + if( lb->num_of_workers==1 ) { /* A single worker - no need to search */ - rec=w->lb_workers[0]; + rec=lb->lb_workers[0]; } else { - rec=jk2_get_most_suitable_worker(env, w, s, attempt++); + rec=jk2_get_most_suitable_worker(env, lb, s, attempt++); } s->is_recoverable_error = JK_FALSE; @@ -243,7 +305,7 @@ /* NULL record, no more workers left ... */ env->l->jkLog(env, env->l, JK_LOG_ERROR, "lb_worker.service() No suitable workers left \n"); - return JK_FALSE; + return JK_ERR; } env->l->jkLog(env, env->l, JK_LOG_INFO, @@ -253,7 +315,7 @@ rc = rec->service(env, rec, s); - if(rc==JK_TRUE) { + if(rc==JK_OK) { if(rec->in_recovering) { rec->lb_value = jk2_get_max_lb(rec) + ADDITINAL_WAIT_LOAD; } @@ -263,7 +325,7 @@ rec->error_time = 0; /* the endpoint that succeeded is saved for done() */ s->realWorker = rec; - return JK_TRUE; + return JK_OK; } /* @@ -289,51 +351,51 @@ env->l->jkLog(env, env->l, JK_LOG_INFO, "lb_worker.service() try other host\n"); } - return JK_FALSE; + return JK_ERR; } /** Init internal structures. Called any time the config changes */ -static int JK_METHOD jk2_lb_initLbArray(jk_env_t *env, jk_worker_t *_this) +static int JK_METHOD jk2_lb_refresh(jk_env_t *env, jk_worker_t *lb) { int currentWorker=0; int i; - _this->num_of_workers=_this->lbWorkerMap->size( env, _this->lbWorkerMap); + lb->num_of_workers=lb->lbWorkerMap->size( env, lb->lbWorkerMap); - if( _this->lb_workers_size < _this->num_of_workers ) { - if( _this->lb_workers_size==0 ) { - _this->lb_workers_size=10; + if( lb->lb_workers_size < lb->num_of_workers ) { + if( lb->lb_workers_size==0 ) { + lb->lb_workers_size=10; } else { - _this->lb_workers_size = 2 * _this->lb_workers_size; + lb->lb_workers_size = 2 * lb->lb_workers_size; } - _this->lb_workers = - _this->pool->alloc(env, _this->pool, - _this->lb_workers_size * sizeof(jk_worker_t *)); - if(!_this->lb_workers) { + lb->lb_workers = + lb->pool->alloc(env, lb->pool, + lb->lb_workers_size * sizeof(jk_worker_t *)); + if(!lb->lb_workers) { env->l->jkLog(env, env->l, JK_LOG_ERROR, "lb_worker.validate(): OutOfMemoryException\n"); - return JK_FALSE; + return JK_ERR; } } - for(i = 0 ; i < _this->num_of_workers ; i++) { - char *name = _this->lbWorkerMap->nameAt( env, _this->lbWorkerMap, i); + for(i = 0 ; i < lb->num_of_workers ; i++) { + char *name = lb->lbWorkerMap->nameAt( env, lb->lbWorkerMap, i); jk_worker_t *w= env->getByName( env, name ); if( w== NULL ) { env->l->jkLog(env, env->l, JK_LOG_ERROR, "lb_worker.init(): no worker found %s\n", name); - _this->num_of_workers--; + lb->num_of_workers--; continue; } - _this->lb_workers[currentWorker]=w; + lb->lb_workers[currentWorker]=w; - if( _this->lb_workers[currentWorker]->lb_factor == 0 ) - _this->lb_workers[currentWorker]->lb_factor = DEFAULT_LB_FACTOR; + if( w->lb_factor == 0 ) + w->lb_factor = DEFAULT_LB_FACTOR; - _this->lb_workers[currentWorker]->lb_factor = - 1/ _this->lb_workers[currentWorker]->lb_factor; + w->lb_factor = + 1/ w->lb_factor; /* * Allow using lb in fault-tolerant mode. @@ -341,69 +403,83 @@ * a worker used only when principal is down or session route * point to it. Provided by Paul Frieden <[EMAIL PROTECTED]> */ - _this->lb_workers[currentWorker]->lb_value = - _this->lb_workers[currentWorker]->lb_factor; - _this->lb_workers[currentWorker]->in_error_state = JK_FALSE; - _this->lb_workers[currentWorker]->in_recovering = JK_FALSE; - _this->lb_workers[currentWorker]->retry_count = 0; + w->lb_value = + w->lb_factor; + w->in_error_state = JK_FALSE; + w->in_recovering = JK_FALSE; + w->retry_count = 0; currentWorker++; } - return JK_TRUE; + return JK_OK; +} + +static int JK_METHOD jk2_lb_addWorker(jk_env_t *env, jk_worker_t *lb, + char *name) +{ + name = lb->pool->pstrdup(env, lb->pool, name); + lb->lbWorkerMap->add(env, lb->lbWorkerMap, name, ""); + + env->l->jkLog(env, env->l, JK_LOG_INFO, + "lb_worker.setAttribute(): Adding %s %s\n", lb->mbean->name, name); + } static int JK_METHOD jk2_lb_setProperty(jk_env_t *env, jk_bean_t *mbean, char *name, void *valueP) { - jk_worker_t *_this=mbean->object; + jk_worker_t *lb=mbean->object; char *value=valueP; int err; char **worker_names; unsigned num_of_workers; unsigned i = 0; char *tmp; - - /* XXX Add one-by-one */ if( strcmp( name, "balanced_workers") == 0 ) { - worker_names=jk2_config_split( env, _this->pool, + worker_names=jk2_config_split( env, lb->pool, value, NULL, &num_of_workers ); if( worker_names==NULL || num_of_workers==0 ) { env->l->jkLog(env, env->l, JK_LOG_ERROR, "lb_worker.validate(): no defined workers\n"); - return JK_FALSE; + return JK_ERR; } for(i = 0 ; i < num_of_workers ; i++) { - char *name = _this->pool->pstrdup(env, _this->pool, worker_names[i]); - _this->lbWorkerMap->add(env, _this->lbWorkerMap, name, ""); - env->l->jkLog(env, env->l, JK_LOG_INFO, - "lb_worker.setAttribute(): Adding %s %s\n", _this->mbean->name, name); + jk2_lb_addWorker( env, lb, worker_names[i]); } - jk2_lb_initLbArray( env, _this ); - return JK_TRUE; + jk2_lb_refresh( env, lb ); + return JK_OK; + } else if( strcmp( name, "worker") == 0 ) { + jk2_lb_addWorker( env, lb, value); + jk2_lb_refresh( env, lb ); + return JK_OK; } - return JK_FALSE; + return JK_ERR; } -static int JK_METHOD jk2_lb_init(jk_env_t *env, jk_worker_t *_this) +static int JK_METHOD jk2_lb_init(jk_env_t *env, jk_worker_t *lb) { int err; char **worker_names; int i = 0; char *tmp; - int num_of_workers=_this->lbWorkerMap->size( env, _this->lbWorkerMap); + int num_of_workers=lb->lbWorkerMap->size( env, lb->lbWorkerMap); - err=jk2_lb_initLbArray(env, _this ); - if( err != JK_TRUE ) + err=jk2_lb_refresh(env, lb ); + if( err != JK_OK ) return err; - + + lb->ver=0; + if( lb->workerEnv->shm != NULL && lb->workerEnv->shm->head != NULL) + jk2_lb_updateWorkers(env, lb, lb->workerEnv->shm); + env->l->jkLog(env, env->l, JK_LOG_INFO, "lb.init() %s %d workers\n", - _this->mbean->name, _this->num_of_workers ); - - return JK_TRUE; + lb->mbean->name, lb->num_of_workers ); + + return JK_OK; } static int JK_METHOD jk2_lb_destroy(jk_env_t *env, jk_worker_t *w) @@ -413,7 +489,7 @@ if(w==NULL ) { env->l->jkLog(env, env->l, JK_LOG_ERROR, "lb_worker.destroy() NullPointerException\n"); - return JK_FALSE; + return JK_ERR; } /* Workers are destroyed by the workerEnv. It is possible @@ -427,7 +503,7 @@ w->pool->close(env, w->pool); - return JK_TRUE; + return JK_OK; } @@ -439,7 +515,7 @@ if(NULL == name ) { env->l->jkLog(env, env->l, JK_LOG_ERROR, "lb_worker.factory() NullPointerException\n"); - return JK_FALSE; + return JK_ERR; } w = (jk_worker_t *)pool->calloc(env, pool, sizeof(jk_worker_t)); @@ -447,7 +523,7 @@ if(w==NULL) { env->l->jkLog(env, env->l, JK_LOG_ERROR, "lb_worker.factory() OutOfMemoryException\n"); - return JK_FALSE; + return JK_ERR; } w->pool=pool; @@ -468,6 +544,6 @@ w->workerEnv=env->getByName( env, "workerEnv" ); w->workerEnv->addWorker( env, w->workerEnv, w ); - return JK_TRUE; + return JK_OK; }
-- To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]> For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>