details: https://hg.nginx.org/njs/rev/e3c442561889 branches: changeset: 2190:e3c442561889 user: Dmitry Volyntsev <xei...@nginx.com> date: Tue Sep 05 09:17:10 2023 -0700 description: Modules: added worker_affinity parameter for js_periodic directive.
worker_affinity specifies on what set of workers the js_periodic handler should be executed. By default the js_handler is executed only on worker 0. The parameter accepts a binary mask or "all" to specify all workers. example.conf: worker_processes 4; ... location @periodics { # to be run at 1 minute intervals in worker 0 js_periodic main.handler interval=60s; # to be run at 1 minute intervals in all the workers js_periodic main.handler interval=60s worker_affinity=all; # to be run at 1 minute intervals in workers 1 and 3 js_periodic main.handler interval=60s worker_affinity=0101; } diffstat: nginx/ngx_http_js_module.c | 68 +++++++++++++++++++++++++++++++++++++++++++- nginx/ngx_stream_js_module.c | 68 +++++++++++++++++++++++++++++++++++++++++++- nginx/t/js_periodic.t | 67 +++++++++++++++--------------------------- nginx/t/stream_js_periodic.t | 66 +++++++++++++++--------------------------- 4 files changed, 182 insertions(+), 87 deletions(-) diffs (587 lines): diff -r 58d40fc80c52 -r e3c442561889 nginx/ngx_http_js_module.c --- a/nginx/ngx_http_js_module.c Thu Aug 31 08:24:17 2023 -0700 +++ b/nginx/ngx_http_js_module.c Tue Sep 05 09:17:10 2023 -0700 @@ -25,7 +25,7 @@ typedef struct { typedef struct { ngx_http_conf_ctx_t *conf_ctx; ngx_connection_t *connection; - void *padding; + uint8_t *worker_affinity; /** * fd is used for event debug and should be at the same position @@ -4544,6 +4544,16 @@ ngx_http_js_init_worker(ngx_cycle_t *cyc periodics = jmcf->periodics->elts; for (i = 0; i < jmcf->periodics->nelts; i++) { + if (periodics[i].worker_affinity != NULL + && !periodics[i].worker_affinity[ngx_worker]) + { + continue; + } + + if (periodics[i].worker_affinity == NULL && ngx_worker != 0) { + continue; + } + periodics[i].fd = 1000000 + i; if (ngx_http_js_periodic_init(&periodics[i]) != NGX_OK) { @@ -4558,9 +4568,11 @@ ngx_http_js_init_worker(ngx_cycle_t *cyc static char * ngx_http_js_periodic(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) { + uint8_t *mask; ngx_str_t *value, s; ngx_msec_t interval, jitter; ngx_uint_t i; + ngx_core_conf_t *ccf; ngx_js_periodic_t *periodic; ngx_js_main_conf_t *jmcf; @@ -4586,6 +4598,7 @@ ngx_http_js_periodic(ngx_conf_t *cf, ngx ngx_memzero(periodic, sizeof(ngx_js_periodic_t)); + mask = NULL; jitter = 0; interval = 5000; @@ -4619,6 +4632,58 @@ ngx_http_js_periodic(ngx_conf_t *cf, ngx continue; } + if (ngx_strncmp(value[i].data, "worker_affinity=", 16) == 0) { + s.len = value[i].len - 16; + s.data = value[i].data + 16; + + ccf = (ngx_core_conf_t *) ngx_get_conf(cf->cycle->conf_ctx, + ngx_core_module); + + if (ccf->worker_processes == NGX_CONF_UNSET) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "\"worker_affinity\" is not supported " + "with unset \"worker_processes\" directive"); + return NGX_CONF_ERROR; + } + + mask = ngx_palloc(cf->pool, ccf->worker_processes); + if (mask == NULL) { + return NGX_CONF_ERROR; + } + + if (ngx_strncmp(s.data, "all", 3) == 0) { + memset(mask, 1, ccf->worker_processes); + continue; + } + + if ((size_t) ccf->worker_processes != s.len) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "the number of " + "\"worker_processes\" is not equal to the " + "size of \"worker_affinity\" mask"); + return NGX_CONF_ERROR; + } + + for (i = 0; i < s.len; i++) { + if (s.data[i] == '0') { + mask[i] = 0; + continue; + } + + if (s.data[i] == '1') { + mask[i] = 1; + continue; + } + + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "invalid character \"%c\" in \"worker_affinity=\"", + s.data[i]); + + return NGX_CONF_ERROR; + } + + continue; + } + invalid: ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, @@ -4629,6 +4694,7 @@ invalid: periodic->method = value[1]; periodic->interval = interval; periodic->jitter = jitter; + periodic->worker_affinity = mask; periodic->conf_ctx = cf->ctx; return NGX_CONF_OK; diff -r 58d40fc80c52 -r e3c442561889 nginx/ngx_stream_js_module.c --- a/nginx/ngx_stream_js_module.c Thu Aug 31 08:24:17 2023 -0700 +++ b/nginx/ngx_stream_js_module.c Tue Sep 05 09:17:10 2023 -0700 @@ -30,7 +30,7 @@ typedef struct { typedef struct { ngx_stream_conf_ctx_t *conf_ctx; ngx_connection_t *connection; - void *padding; + uint8_t *worker_affinity; /** * fd is used for event debug and should be at the same position @@ -2049,6 +2049,16 @@ ngx_stream_js_init_worker(ngx_cycle_t *c periodics = jmcf->periodics->elts; for (i = 0; i < jmcf->periodics->nelts; i++) { + if (periodics[i].worker_affinity != NULL + && !periodics[i].worker_affinity[ngx_worker]) + { + continue; + } + + if (periodics[i].worker_affinity == NULL && ngx_worker != 0) { + continue; + } + periodics[i].fd = 1000000 + i; if (ngx_stream_js_periodic_init(&periodics[i]) != NGX_OK) { @@ -2063,9 +2073,11 @@ ngx_stream_js_init_worker(ngx_cycle_t *c static char * ngx_stream_js_periodic(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) { + uint8_t *mask; ngx_str_t *value, s; ngx_msec_t interval, jitter; ngx_uint_t i; + ngx_core_conf_t *ccf; ngx_js_periodic_t *periodic; ngx_js_main_conf_t *jmcf; @@ -2091,6 +2103,7 @@ ngx_stream_js_periodic(ngx_conf_t *cf, n ngx_memzero(periodic, sizeof(ngx_js_periodic_t)); + mask = NULL; jitter = 0; interval = 5000; @@ -2124,6 +2137,58 @@ ngx_stream_js_periodic(ngx_conf_t *cf, n continue; } + if (ngx_strncmp(value[i].data, "worker_affinity=", 16) == 0) { + s.len = value[i].len - 16; + s.data = value[i].data + 16; + + ccf = (ngx_core_conf_t *) ngx_get_conf(cf->cycle->conf_ctx, + ngx_core_module); + + if (ccf->worker_processes == NGX_CONF_UNSET) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "\"worker_affinity\" is not supported " + "with unset \"worker_processes\" directive"); + return NGX_CONF_ERROR; + } + + mask = ngx_palloc(cf->pool, ccf->worker_processes); + if (mask == NULL) { + return NGX_CONF_ERROR; + } + + if (ngx_strncmp(s.data, "all", 3) == 0) { + memset(mask, 1, ccf->worker_processes); + continue; + } + + if ((size_t) ccf->worker_processes != s.len) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "the number of " + "\"worker_processes\" is not equal to the " + "size of \"worker_affinity\" mask"); + return NGX_CONF_ERROR; + } + + for (i = 0; i < s.len; i++) { + if (s.data[i] == '0') { + mask[i] = 0; + continue; + } + + if (s.data[i] == '1') { + mask[i] = 1; + continue; + } + + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "invalid character \"%c\" in \"worker_affinity=\"", + s.data[i]); + + return NGX_CONF_ERROR; + } + + continue; + } + invalid: ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, @@ -2134,6 +2199,7 @@ invalid: periodic->method = value[1]; periodic->interval = interval; periodic->jitter = jitter; + periodic->worker_affinity = mask; periodic->conf_ctx = cf->ctx; return NGX_CONF_OK; diff -r 58d40fc80c52 -r e3c442561889 nginx/t/js_periodic.t --- a/nginx/t/js_periodic.t Thu Aug 31 08:24:17 2023 -0700 +++ b/nginx/t/js_periodic.t Tue Sep 05 09:17:10 2023 -0700 @@ -23,12 +23,13 @@ use Test::Nginx; select STDERR; $| = 1; select STDOUT; $| = 1; -my $t = Test::Nginx->new()->has(qw/http/) +my $t = Test::Nginx->new()->has(qw/http rewrite/) ->write_file_expand('nginx.conf', <<'EOF'); %%TEST_GLOBALS%% daemon off; +worker_processes 4; events { } @@ -42,6 +43,7 @@ http { js_shared_dict_zone zone=nums:32k type=number; js_shared_dict_zone zone=strings:32k; + js_shared_dict_zone zone=workers:32k type=number; server { listen 127.0.0.1:8080; @@ -49,11 +51,12 @@ http { location @periodic { js_periodic test.tick interval=30ms jitter=1ms; - js_periodic test.timer interval=1s; + js_periodic test.timer interval=1s worker_affinity=all; js_periodic test.overrun interval=30ms; js_periodic test.file interval=1s; js_periodic test.fetch interval=40ms; js_periodic test.multiple_fetches interval=1s; + js_periodic test.affinity interval=50ms worker_affinity=0101; js_periodic test.fetch_exception interval=1s; js_periodic test.tick_exception interval=1s; @@ -69,6 +72,10 @@ http { return 200 'foo'; } + location /test_affinity { + js_content test.test_affinity; + } + location /test_fetch { js_content test.test_fetch; } @@ -102,11 +109,11 @@ my $p0 = port(8080); $t->write_file('test.js', <<EOF); import fs from 'fs'; + function affinity() { + ngx.shared.workers.set(ngx.worker_id, 1); + } + async function fetch() { - if (ngx.worker_id != 0) { - return; - } - let reply = await ngx.fetch('http://127.0.0.1:$p0/fetch_ok'); let body = await reply.text(); @@ -115,10 +122,6 @@ my $p0 = port(8080); } async function multiple_fetches() { - if (ngx.worker_id != 0) { - return; - } - let reply = await ngx.fetch('http://127.0.0.1:$p0/fetch_ok'); let reply2 = await ngx.fetch('http://127.0.0.1:$p0/fetch_foo'); let body = await reply.text(); @@ -128,18 +131,10 @@ my $p0 = port(8080); } async function fetch_exception() { - if (ngx.worker_id != 0) { - return; - } - let reply = await ngx.fetch('garbage'); } async function file() { - if (ngx.worker_id != 0) { - return; - } - let fh = await fs.promises.open(ngx.conf_prefix + 'file', 'a+'); await fh.write('abc'); @@ -147,27 +142,15 @@ my $p0 = port(8080); } async function overrun() { - if (ngx.worker_id != 0) { - return; - } - setTimeout(() => {}, 100000); } function tick() { - if (ngx.worker_id != 0) { - return; - } - ngx.shared.nums.incr('tick', 1); } function tick_exception() { - if (ngx.worker_id != 0) { - return; - } - throw new Error("EXCEPTION"); } @@ -180,19 +163,11 @@ my $p0 = port(8080); } function timer_exception() { - if (ngx.worker_id != 0) { - return; - } - setTimeout(() => {ngx.log(ngx.ERR, 'should not be seen')}, 10); throw new Error("EXCEPTION"); } function timeout_exception() { - if (ngx.worker_id != 0) { - return; - } - setTimeout(() => { var v = ngx.shared.nums.get('timeout_exception') || 0; @@ -206,6 +181,10 @@ my $p0 = port(8080); }, 1); } + function test_affinity(r) { + r.return(200, `[\${ngx.shared.workers.keys().toSorted()}]`); + } + function test_fetch(r) { r.return(200, ngx.shared.strings.get('fetch').startsWith('okok')); } @@ -232,18 +211,20 @@ my $p0 = port(8080); r.return(200, ngx.shared.nums.get('timeout_exception') >= 2); } - export default { fetch, fetch_exception, file, multiple_fetches, overrun, - test_fetch, test_file, test_multiple_fetches, test_tick, - test_timeout_exception, test_timer, tick, tick_exception, - timer, timer_exception, timeout_exception }; + export default { affinity, fetch, fetch_exception, file, multiple_fetches, + overrun, test_affinity, test_fetch, test_file, + test_multiple_fetches, test_tick, test_timeout_exception, + test_timer, tick, tick_exception, timer, timer_exception, + timeout_exception }; EOF -$t->try_run('no js_periodic')->plan(7); +$t->try_run('no js_periodic')->plan(8); ############################################################################### select undef, undef, undef, 0.1; +like(http_get('/test_affinity'), qr/\[1,3]/, 'affinity test'); like(http_get('/test_tick'), qr/true/, '3x tick test'); like(http_get('/test_timer'), qr/true/, 'timer test'); like(http_get('/test_file'), qr/true/, 'file test'); diff -r 58d40fc80c52 -r e3c442561889 nginx/t/stream_js_periodic.t --- a/nginx/t/stream_js_periodic.t Thu Aug 31 08:24:17 2023 -0700 +++ b/nginx/t/stream_js_periodic.t Tue Sep 05 09:17:10 2023 -0700 @@ -24,12 +24,13 @@ use Test::Nginx::Stream qw/ stream /; select STDERR; $| = 1; select STDOUT; $| = 1; -my $t = Test::Nginx->new()->has(qw/http stream/) +my $t = Test::Nginx->new()->has(qw/http rewrite stream/) ->write_file_expand('nginx.conf', <<'EOF'); %%TEST_GLOBALS%% daemon off; +worker_processes 4; events { } @@ -43,16 +44,18 @@ stream { js_shared_dict_zone zone=nums:32k type=number; js_shared_dict_zone zone=strings:32k; + js_shared_dict_zone zone=workers:32k type=number; server { listen 127.0.0.1:8080; js_periodic test.tick interval=30ms jitter=1ms; - js_periodic test.timer interval=1s; + js_periodic test.timer interval=1s worker_affinity=all; js_periodic test.overrun interval=30ms; js_periodic test.file interval=1s; js_periodic test.fetch interval=40ms; js_periodic test.multiple_fetches interval=1s; + js_periodic test.affinity interval=50ms worker_affinity=0101; js_periodic test.fetch_exception interval=1s; js_periodic test.tick_exception interval=1s; @@ -89,11 +92,11 @@ my $p1 = port(8081); $t->write_file('test.js', <<EOF); import fs from 'fs'; + function affinity() { + ngx.shared.workers.set(ngx.worker_id, 1); + } + async function fetch() { - if (ngx.worker_id != 0) { - return; - } - let reply = await ngx.fetch('http://127.0.0.1:$p1/fetch_ok'); let body = await reply.text(); @@ -102,18 +105,10 @@ my $p1 = port(8081); } async function fetch_exception() { - if (ngx.worker_id != 0) { - return; - } - let reply = await ngx.fetch('garbage'); } async function multiple_fetches() { - if (ngx.worker_id != 0) { - return; - } - let reply = await ngx.fetch('http://127.0.0.1:$p1/fetch_ok'); let reply2 = await ngx.fetch('http://127.0.0.1:$p1/fetch_foo'); let body = await reply.text(); @@ -123,10 +118,6 @@ my $p1 = port(8081); } async function file() { - if (ngx.worker_id != 0) { - return; - } - let fh = await fs.promises.open(ngx.conf_prefix + 'file', 'a+'); await fh.write('abc'); @@ -134,26 +125,14 @@ my $p1 = port(8081); } async function overrun() { - if (ngx.worker_id != 0) { - return; - } - setTimeout(() => {}, 100000); } function tick() { - if (ngx.worker_id != 0) { - return; - } - ngx.shared.nums.incr('tick', 1); } function tick_exception() { - if (ngx.worker_id != 0) { - return; - } - throw new Error("EXCEPTION"); } @@ -166,19 +145,11 @@ my $p1 = port(8081); } function timer_exception() { - if (ngx.worker_id != 0) { - return; - } - setTimeout(() => {ngx.log(ngx.ERR, 'should not be seen')}, 10); throw new Error("EXCEPTION"); } function timeout_exception() { - if (ngx.worker_id != 0) { - return; - } - setTimeout(() => { var v = ngx.shared.nums.get('timeout_exception') || 0; @@ -196,6 +167,15 @@ my $p1 = port(8081); s.on('upload', function (data) { if (data.length > 0) { switch (data) { + case 'affinity': + if (ngx.shared.workers.keys().toSorted().toString() + == '1,3') + { + s.done(); + return; + } + + break; case 'fetch': if (ngx.shared.strings.get('fetch').startsWith('okok')) { s.done(); @@ -258,19 +238,21 @@ my $p1 = port(8081); }); } - export default { fetch, fetch_exception, multiple_fetches, file, overrun, - test, tick, tick_exception, timer, timer_exception, - timeout_exception }; + export default { affinity, fetch, fetch_exception, multiple_fetches, file, + overrun, test, tick, tick_exception, timer, + timer_exception, timeout_exception }; EOF $t->run_daemon(\&stream_daemon, port(8090)); -$t->try_run('no js_periodic')->plan(7); +$t->try_run('no js_periodic')->plan(8); $t->waitforsocket('127.0.0.1:' . port(8090)); ############################################################################### select undef, undef, undef, 0.1; +is(stream('127.0.0.1:' . port(8080))->io('affinity'), 'affinity', + 'affinity test'); is(stream('127.0.0.1:' . port(8080))->io('tick'), 'tick', '3x tick test'); is(stream('127.0.0.1:' . port(8080))->io('timer'), 'timer', 'timer test'); is(stream('127.0.0.1:' . port(8080))->io('file'), 'file', 'file test'); _______________________________________________ nginx-devel mailing list nginx-devel@nginx.org https://mailman.nginx.org/mailman/listinfo/nginx-devel