mturk 2004/12/07 04:24:53 Modified: jk/native/common jk_lb_worker.c jk_lb_worker.h jk_util.c jk_util.h Log: Make jk replication aware using domain model. Great patch from Rainer Jung. Revision Changes Path 1.33 +219 -48 jakarta-tomcat-connectors/jk/native/common/jk_lb_worker.c Index: jk_lb_worker.c =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/jk/native/common/jk_lb_worker.c,v retrieving revision 1.32 retrieving revision 1.33 diff -u -r1.32 -r1.33 --- jk_lb_worker.c 26 Nov 2004 16:59:51 -0000 1.32 +++ jk_lb_worker.c 7 Dec 2004 12:24:53 -0000 1.33 @@ -40,6 +40,16 @@ #define WAIT_BEFORE_RECOVER (60*1) #define WORKER_RECOVER_TIME ("recover_time") +static const char *search_types[] = { + "none", + "sticky", + "sticky domain", + "local", + "local domain", + "any", + NULL +}; + /** * Worker record should be inside shared * memory for correct behavior. @@ -49,9 +59,11 @@ struct worker_record { char *name; + char *domain; int lb_factor; int lb_value; int is_local_worker; + int is_local_domain; int in_error_state; int in_recovering; time_t error_time; @@ -63,6 +75,7 @@ { worker_record_t *lb_workers; unsigned num_of_workers; + unsigned num_of_local_workers; jk_pool_t p; jk_pool_atom_t buf[TINY_POOL_SIZE]; @@ -198,77 +211,210 @@ } } + +int is_worker_candidate(worker_record_t *wr, + int search_id, + const char *search_string, + jk_logger_t *l) +{ + switch (search_id) { + case 1: + if (strcmp(search_string, wr->name) == 0) { + return JK_TRUE; + } + break; + case 2: + if (strcmp(search_string, wr->domain) == 0) { + return JK_TRUE; + } + break; + case 3: + if (wr->is_local_worker) { + return JK_TRUE; + } + break; + case 4: + if (wr->is_local_domain) { + return JK_TRUE; + } + break; + case 5: + return JK_TRUE; + } + jk_log(l, JK_LOG_ERROR, + "wrong search id %d\n", + search_id); + return JK_FALSE; +} + +static worker_record_t *get_suitable_worker(lb_worker_t *p, + int search_id, + const char *search_string, + int start, + int stop, + int use_lb_factor, + int *domain_id, + jk_logger_t *l) +{ + + worker_record_t *rc = NULL; + int lb_max = 0; + int total_factor = 0; + const char *search_type = search_types[search_id]; + int i; + + *domain_id = -1; + + jk_log(l, JK_LOG_DEBUG, + "searching for %s worker (%s)\n", + search_type, search_string); + for (i = start; i < stop; i++) { + jk_log(l, JK_LOG_DEBUG, + "testing worker %s (%d) for match with %s (%s)\n", + p->lb_workers[i].name, i, search_type, search_string); + if (is_worker_candidate(&(p->lb_workers[i]), search_id, search_string, l)) { + if (search_id == 1) { + *domain_id = i; + } + if (!p->lb_workers[i].in_error_state || !p->lb_workers[i].in_recovering) { + jk_log(l, JK_LOG_DEBUG, + "found candidate worker %s (%d) with previous load %d in search with %s (%s)\n", + p->lb_workers[i].name, i, p->lb_workers[i].lb_value, search_type, search_string); + + if (p->lb_workers[i].in_error_state) { + + time_t now = time(0); + int elapsed = now - p->lb_workers[i].error_time; + if (elapsed <= p->recover_wait_time) { + jk_log(l, JK_LOG_DEBUG, + "worker candidate %s (%d) is in error state - will not yet recover (%d < %d)\n", + p->lb_workers[i].name, i, elapsed, p->recover_wait_time); + continue; + } + } + + if (use_lb_factor) { + p->lb_workers[i].lb_value += p->lb_workers[i].lb_factor; + total_factor += p->lb_workers[i].lb_factor; + if (p->lb_workers[i].lb_value > lb_max || !rc) { + lb_max = p->lb_workers[i].lb_value; + rc = &(p->lb_workers[i]); + jk_log(l, JK_LOG_DEBUG, + "new maximal worker %s (%d) with previous load %d in search with %s (%s)\n", + rc->name, i, rc->lb_value, search_type, search_string); + } + } else { + rc = &(p->lb_workers[i]); + break; + } + } else if (JK_IS_DEBUG_LEVEL(l) ) { + jk_log(l, JK_LOG_TRACE, + "worker candidate %s (%d) is in error state - already recovers\n", + p->lb_workers[i].name, i); + } + } + } + + if (rc) { + if (rc->in_error_state) { + time_t now = time(0); + rc->in_recovering = JK_TRUE; + rc->error_time = now; + jk_log(l, JK_LOG_DEBUG, + "found worker %s is in error state - will recover\n", + rc->name); + } + rc->lb_value -= total_factor; + jk_log(l, JK_LOG_DEBUG, + "found worker %s with new load %d in search with %s (%s)\n", + rc->name, rc->lb_value, search_type, search_string); + return rc; + } + jk_log(l, JK_LOG_DEBUG, + "found no %s (%s) worker\n", + search_type, search_string); + return rc; +} + static worker_record_t *get_most_suitable_worker(lb_worker_t * p, jk_ws_service_t *s, - int attempt) + int attempt, + jk_logger_t *l) { worker_record_t *rc = NULL; - unsigned i; char *sessionid = NULL; int total_factor = 0; + int domain_id =-1; + JK_TRACE_ENTER(l); if (p->sticky_session) { sessionid = get_sessionid(s); } + jk_log(l, JK_LOG_DEBUG, + "total sessionid is %s.\n", + sessionid ? sessionid : "empty"); while (sessionid) { char *next = strchr(sessionid, ';'); char *session_route; + char *session_domain; if (next) { *next++ = '\0'; } + jk_log(l, JK_LOG_DEBUG, + "searching worker for partial sessionid %s.\n", + sessionid); session_route = strchr(sessionid, '.'); if (session_route) { ++session_route; - for (i = 0; i < p->num_of_workers; i++) { - if (0 == strcmp(session_route, p->lb_workers[i].name)) { - /* First attempt will allways be to the - correct host. If this is indeed down and no - hope of recovery, we'll go to fail-over - */ - if (attempt > 0 && p->lb_workers[i].in_error_state) { - next = NULL; /* Double break; */ - break; - } - else { - return &(p->lb_workers[i]); - } - } - } - } - sessionid = next; - } - for (i = 0; i < p->num_of_workers; i++) { - if (!p->in_local_worker_mode || p->lb_workers[i].is_local_worker - || !p->local_worker_only) { - if (p->lb_workers[i].in_error_state) { - if (!p->lb_workers[i].in_recovering) { - time_t now = time(0); - if ((now - p->lb_workers[i].error_time) > - p->recover_wait_time) { - p->lb_workers[i].in_recovering = JK_TRUE; - p->lb_workers[i].error_time = now; - rc = &(p->lb_workers[i]); + rc = get_suitable_worker(p, 1, session_route, 0, p->num_of_workers, 0, &domain_id, l); + if (rc) { + JK_TRACE_EXIT(l); + return rc; + } - break; - } - } + if (domain_id >= 0 && domain_id < (int)p->num_of_workers) { + session_domain = p->lb_workers[domain_id].domain; } else { - p->lb_workers[i].lb_value += p->lb_workers[i].lb_factor; - total_factor += p->lb_workers[i].lb_factor; - if (!rc || p->lb_workers[i].lb_value > rc->lb_value) - rc = &(p->lb_workers[i]); + session_domain = JK_LB_DEF_DOMAIN_NAME; + } + jk_log(l, JK_LOG_DEBUG, + "found domain %s in route %s\n", + session_domain, session_route); + + rc = get_suitable_worker(p, 2, session_domain, 0, p->num_of_workers, 1, &domain_id, l); + if (rc) { + JK_TRACE_EXIT(l); + return rc; } + } + sessionid = next; } + + + rc = get_suitable_worker(p, 3, "any", 0, p->num_of_local_workers, 1, &domain_id, l); if (rc) { - rc->lb_value -= total_factor; + JK_TRACE_EXIT(l); + return rc; + } + + if (p->local_worker_only) { + JK_TRACE_EXIT(l); + return NULL; + } + + rc = get_suitable_worker(p, 4, "any", p->num_of_local_workers, p->num_of_workers, 1, &domain_id, l); + if (rc) { + JK_TRACE_EXIT(l); + return rc; } - + rc = get_suitable_worker(p, 5, "any", p->num_of_local_workers, p->num_of_workers, 1, &domain_id, l); + JK_TRACE_EXIT(l); return rc; } @@ -297,7 +443,7 @@ while (1) { worker_record_t *rec = - get_most_suitable_worker(p->worker, s, attempt++); + get_most_suitable_worker(p->worker, s, attempt++, l); int rc; if (rec) { @@ -396,17 +542,20 @@ if (pThis && pThis->worker_private) { lb_worker_t *p = pThis->worker_private; char **worker_names; - unsigned num_of_workers; + unsigned int num_of_workers; + unsigned int num_of_local_workers; + p->in_local_worker_mode = JK_FALSE; p->local_worker_only = jk_get_local_worker_only_flag(props, p->name); p->sticky_session = jk_get_is_sticky_session(props, p->name); + p->num_of_local_workers = 0; if (jk_get_lb_worker_list(props, p->name, &worker_names, &num_of_workers) && num_of_workers) { - unsigned i = 0; - unsigned j = 0; + unsigned int i = 0; + unsigned int j = 0; p->lb_workers = jk_pool_alloc(&p->p, num_of_workers * @@ -425,7 +574,12 @@ if (p->lb_workers[i].lb_factor < 1) { p->lb_workers[i].lb_factor = 1; } - + p->lb_workers[i].domain = jk_get_worker_domain(props, + worker_names[i], + JK_LB_DEF_DOMAIN_NAME); + if (!p->lb_workers[i].domain) { + p->lb_workers[i].domain = JK_LB_DEF_DOMAIN_NAME; + } p->lb_workers[i].is_local_worker = jk_get_is_local_worker(props, worker_names[i]); if (p->lb_workers[i].is_local_worker) @@ -457,6 +611,7 @@ j++; } } + num_of_local_workers = j; if (!p->in_local_worker_mode) { p->local_worker_only = JK_FALSE; @@ -470,10 +625,24 @@ } else { + for (i = 0; i < num_of_local_workers; i++) { + p->lb_workers[i].is_local_domain=1; + } + for (i = num_of_local_workers; i < num_of_workers; i++) { + p->lb_workers[i].is_local_domain=0; + for (j = 0; j < num_of_local_workers; j++) { + if(0 == strcmp(p->lb_workers[i].domain, p->lb_workers[j].domain)) { + p->lb_workers[i].is_local_domain=1; + break; + } + } + } + for (i = 0; i < num_of_workers; i++) { jk_log(l, JK_LOG_DEBUG, - "Balanced worker %i has name %s\n", - i, p->lb_workers[i].name); + "Balanced worker %i has name %s in domain %s and has local=%d and local_domain=%d\n", + i, p->lb_workers[i].name, p->lb_workers[i].domain, + p->lb_workers[i].is_local_worker, p->lb_workers[i].is_local_domain); } jk_log(l, JK_LOG_DEBUG, "in_local_worker_mode: %s\n", @@ -482,6 +651,7 @@ "local_worker_only: %s\n", (p->local_worker_only ? "true" : "false")); p->num_of_workers = num_of_workers; + p->num_of_local_workers = num_of_local_workers; JK_TRACE_EXIT(l); return JK_TRUE; } @@ -576,6 +746,7 @@ private_data->lb_workers = NULL; private_data->num_of_workers = 0; + private_data->num_of_local_workers = 0; private_data->worker.worker_private = private_data; private_data->worker.validate = validate; private_data->worker.init = init; 1.9 +3 -3 jakarta-tomcat-connectors/jk/native/common/jk_lb_worker.h Index: jk_lb_worker.h =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/jk/native/common/jk_lb_worker.h,v retrieving revision 1.8 retrieving revision 1.9 diff -u -r1.8 -r1.9 --- jk_lb_worker.h 8 Nov 2004 13:30:14 -0000 1.8 +++ jk_lb_worker.h 7 Dec 2004 12:24:53 -0000 1.9 @@ -31,7 +31,8 @@ { #endif /* __cplusplus */ -#define JK_LB_WORKER_NAME ("lb") +#define JK_LB_WORKER_NAME ("lb") +#define JK_LB_DEF_DOMAIN_NAME ("unknown") int JK_METHOD lb_worker_factory(jk_worker_t **w, const char *name, jk_logger_t *l); @@ -39,5 +40,4 @@ #ifdef __cplusplus } #endif /* __cplusplus */ - #endif /* JK_LB_WORKER_H */ 1.45 +28 -18 jakarta-tomcat-connectors/jk/native/common/jk_util.c Index: jk_util.c =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/jk/native/common/jk_util.c,v retrieving revision 1.44 retrieving revision 1.45 diff -u -r1.44 -r1.45 --- jk_util.c 7 Dec 2004 10:54:10 -0000 1.44 +++ jk_util.c 7 Dec 2004 12:24:53 -0000 1.45 @@ -59,6 +59,7 @@ #define STICKY_SESSION ("sticky_session") #define LOCAL_WORKER_ONLY_FLAG ("local_worker_only") #define LOCAL_WORKER_FLAG ("local_worker") +#define DOMAIN_OF_WORKER ("domain") #define DEFAULT_WORKER_TYPE JK_AJP13_WORKER_NAME #define SECRET_KEY_OF_WORKER ("secretkey") @@ -339,6 +340,16 @@ return jk_map_get_string(m, buf, DEFAULT_WORKER_TYPE); } +char *jk_get_worker_domain(jk_map_t *m, const char *wname, const char *def) +{ + char buf[1024]; + if (!m || !wname) { + return NULL; + } + sprintf(buf, "%s.%s.%s", PREFIX_OF_WORKER, wname, DOMAIN_OF_WORKER); + return jk_map_get_string(m, buf, def); +} + char *jk_get_worker_secret(jk_map_t *m, const char *wname) { char buf[1024]; @@ -1015,20 +1026,19 @@ s->retries = JK_RETRIES; } -#ifdef _MT_CODE_PTHREAD - -int jk_gettid() -{ - pthread_t t = pthread_self(); - -#ifdef AS400 - /* OS400 use 64 bits ThreadId, get only low 32 bits for now */ - pthread_id_np_t tid; - pthread_getunique_np(&t, &tid); - return ((int)(tid.intId.lo & 0xFFFFFFFF)); -#else - return (int)(t & 0xFFFF); -#endif /* AS400 */ -} - -#endif +#ifdef _MT_CODE_PTHREAD
+int jk_gettid() +{ + pthread_t t = pthread_self(); +#ifdef AS400 + /* OS400 use 64 bits ThreadId, get only low 32 bits for now */ + pthread_id_np_t tid; + pthread_getunique_np(&t, &tid); + return ((int)(tid.intId.lo & 0xFFFFFFFF)); +#else + int tid = 0; + pthread_getunique_np(&t, &tid); + return tid; +#endif /* AS400 */ +} +#endif 1.22 +3 -1 jakarta-tomcat-connectors/jk/native/common/jk_util.h Index: jk_util.h =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/jk/native/common/jk_util.h,v retrieving revision 1.21 retrieving revision 1.22 diff -u -r1.21 -r1.22 --- jk_util.h 12 Nov 2004 18:45:24 -0000 1.21 +++ jk_util.h 7 Dec 2004 12:24:53 -0000 1.22 @@ -70,6 +70,8 @@ int jk_get_worker_recycle_timeout(jk_map_t *m, const char *wname, int def); +char *jk_get_worker_domain(jk_map_t *m, const char *wname, const char *def); + char *jk_get_worker_secret_key(jk_map_t *m, const char *wname); int jk_get_worker_retries(jk_map_t *m, const char *wname, int def); --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]