diff --git a/modules/ngx_http_upstream_dyups_module/ngx_http_dyups_module.c b/modules/ngx_http_upstream_dyups_module/ngx_http_dyups_module.c index 8135ad4336..0b6f632464 100644 --- a/modules/ngx_http_upstream_dyups_module/ngx_http_dyups_module.c +++ b/modules/ngx_http_upstream_dyups_module/ngx_http_dyups_module.c @@ -36,6 +36,7 @@ typedef struct { typedef struct { ngx_flag_t enable; ngx_flag_t trylock; + ngx_flag_t serial_update; ngx_array_t dy_upstreams;/* ngx_http_dyups_srv_conf_t */ ngx_str_t shm_name; ngx_uint_t shm_size; @@ -72,6 +73,7 @@ typedef struct ngx_dyups_shctx_s { ngx_queue_t msg_queue; ngx_uint_t version; ngx_dyups_status_t *status; + ngx_atomic_uint_t flag; } ngx_dyups_shctx_t; @@ -79,6 +81,7 @@ typedef struct ngx_dyups_global_ctx_s { ngx_event_t msg_timer; ngx_slab_pool_t *shpool; ngx_dyups_shctx_t *sh; + ngx_int_t workers; } ngx_dyups_global_ctx_t; @@ -216,6 +219,13 @@ static ngx_command_t ngx_http_dyups_commands[] = { NGX_HTTP_MAIN_CONF_OFFSET, offsetof(ngx_http_dyups_main_conf_t, trylock), NULL }, + + { ngx_string("dyups_serial_update_upstream"), + NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1, + ngx_conf_set_flag_slot, + NGX_HTTP_MAIN_CONF_OFFSET, + offsetof(ngx_http_dyups_main_conf_t, serial_update), + NULL }, ngx_null_command }; @@ -318,6 +328,7 @@ ngx_http_dyups_create_main_conf(ngx_conf_t *cf) dmcf->read_msg_timeout = NGX_CONF_UNSET_MSEC; dmcf->read_msg_log = NGX_CONF_UNSET; dmcf->trylock = NGX_CONF_UNSET; + dmcf->serial_update = NGX_CONF_UNSET; return dmcf; } @@ -349,6 +360,10 @@ ngx_http_dyups_init_main_conf(ngx_conf_t *cf, void *conf) if (dmcf->shm_size == NGX_CONF_UNSET_UINT) { dmcf->shm_size = 2 * 1024 * 1024; } + + if (dmcf->serial_update == NGX_CONF_UNSET) { + dmcf->serial_update = 0; + } return ngx_http_dyups_init_shm(cf, conf); } @@ -427,6 +442,7 @@ ngx_http_dyups_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data) sh->version = 0; sh->status = NULL; + sh->flag = 0; return NGX_OK; } @@ -562,6 +578,8 @@ ngx_http_dyups_init_process(ngx_cycle_t *cycle) } ngx_http_dyups_api_enable = 1; + + ngx_dyups_global_ctx.workers = ccf->worker_processes; timer = &ngx_dyups_global_ctx.msg_timer; ngx_memzero(timer, sizeof(ngx_event_t)); @@ -2063,9 +2081,13 @@ ngx_http_dyups_read_msg(ngx_event_t *ev) ngx_slab_pool_t *shpool; ngx_http_dyups_srv_conf_t *duscfs, *duscf; ngx_http_dyups_main_conf_t *dmcf; + ngx_dyups_shctx_t *sh; + ngx_uint_t workers; dmcf = ev->data; shpool = ngx_dyups_global_ctx.shpool; + sh = ngx_dyups_global_ctx.sh; + workers = ngx_dyups_global_ctx.workers; count = 0; s_count = 0; @@ -2096,7 +2118,16 @@ ngx_http_dyups_read_msg(ngx_event_t *ev) } #if (NGX_HTTP_UPSTREAM_CHECK) - if (!ngx_shmtx_trylock(&shpool->mutex)) { + if (dmcf->serial_update) { + if (sh->flag % workers != ngx_worker) { + ngx_log_error(NGX_LOG_DEBUG, ev->log, 0, + "[dyups] ngx_pid: %P, ngx_worker: %d, sh->flag: %d, not meet lock condition", ngx_pid, ngx_worker, sh->flag); + ngx_dyups_add_timer(ev, dmcf->read_msg_timeout); + return; + } + ngx_shmtx_lock(&shpool->mutex); + } + else if (!ngx_shmtx_trylock(&shpool->mutex)) { goto finish; } #else @@ -2108,6 +2139,14 @@ ngx_http_dyups_read_msg(ngx_event_t *ev) ngx_shmtx_unlock(&shpool->mutex); #if (NGX_HTTP_UPSTREAM_CHECK) + if (dmcf->serial_update) { + ngx_atomic_fetch_add(&sh->flag, 1); + + ngx_atomic_cmp_set(&sh->flag, workers, 0); + + ngx_log_error(NGX_LOG_DEBUG, ev->log, 0, + "[dyups] ngx_pid: %P, ngx_worker: %d, sh->flag: %d, finish read msg under locking condition", ngx_pid, ngx_worker, sh->flag); + } finish: #endif ngx_dyups_add_timer(ev, dmcf->read_msg_timeout); @@ -2129,6 +2168,7 @@ ngx_http_dyups_read_msg_locked(ngx_event_t *ev) ngx_dyups_msg_t *msg; ngx_dyups_shctx_t *sh; ngx_dyups_status_t *status; + ngx_http_dyups_main_conf_t *dmcf; ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ev->log, 0, "[dyups] read msg %P", ngx_pid); @@ -2136,6 +2176,7 @@ ngx_http_dyups_read_msg_locked(ngx_event_t *ev) ccf = (ngx_core_conf_t *) ngx_get_conf(ngx_cycle->conf_ctx, ngx_core_module); + dmcf = ev->data; sh = ngx_dyups_global_ctx.sh; shpool = ngx_dyups_global_ctx.shpool;