diff --git a/README.md b/README.md index fe9191e..6ecb5e5 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,8 @@ NOTE: One can use "Last-Modified" or "Etag" HTTP header in response to prevent wasted upstream refresh actions, Especially when thousands serverlists and upstreams configured. +NOTE: Will also segfault at runtime if you leave out the syntax for serverlist upstream in the config. + ## Directives ### serverlist_service * Syntax: `serverlist_service url=http://xxx/ [conf_dump_dir=dumped_dir/] [interval=5s] [timeout=2s] [concurrency=1];` diff --git a/ngx_http_upstream_serverlist.c b/ngx_http_upstream_serverlist.c index 2fe5c45..9337c61 100644 --- a/ngx_http_upstream_serverlist.c +++ b/ngx_http_upstream_serverlist.c @@ -48,10 +48,12 @@ typedef struct { typedef struct { ngx_http_conf_ctx_t *conf_ctx; ngx_pool_t *conf_pool; + ngx_pool_t *prev_conf_pool; ngx_array_t service_conns; ngx_array_t serverlists; ngx_uint_t service_concurrency; + ngx_int_t conf_pool_count; ngx_url_t service_url; ngx_str_t conf_dump_dir; } main_conf; @@ -74,9 +76,6 @@ init_module(ngx_cycle_t *cycle); static ngx_int_t init_process(ngx_cycle_t *cycle); -static void -exit_process(ngx_cycle_t *cycle); - static void refresh_timeout_handler(ngx_event_t *ev); @@ -134,7 +133,7 @@ ngx_module_t ngx_http_upstream_serverlist_module = { init_process, /* init process */ NULL, /* init thread */ NULL, /* exit thread */ - exit_process, /* exit process */ + NULL, /* exit process */ NULL, /* exit master */ NGX_MODULE_V1_PADDING }; @@ -170,7 +169,7 @@ create_main_conf(ngx_conf_t *cf) { return NULL; } - if (ngx_array_init(&mcf->service_conns, cf->pool, 1, + if (ngx_array_init(&mcf->service_conns, cf->pool, 1, sizeof(service_conn)) != NGX_OK) { return NULL; } @@ -183,6 +182,8 @@ create_main_conf(ngx_conf_t *cf) { mcf->service_concurrency = DEFAULT_SERVICE_CONCURRENCY; mcf->conf_ctx = cf->ctx; mcf->conf_pool = cf->pool; + mcf->conf_pool_count = 0; + return mcf; } @@ -456,29 +457,6 @@ init_process(ngx_cycle_t *cycle) { return NGX_OK; } -static void -exit_process(ngx_cycle_t *cycle) { - main_conf *mcf = ngx_http_cycle_get_module_main_conf(cycle, - ngx_http_upstream_serverlist_module); - serverlist *sls = mcf->serverlists.elts; - service_conn *scs = mcf->service_conns.elts; - ngx_uint_t i; - - for (i = 0; i < mcf->serverlists.nelts; i++) { - if (sls[i].pool) { - ngx_destroy_pool(sls[i].pool); - sls[i].pool = NULL; - } - } - - for (i = 0; i < mcf->service_conns.nelts; i++) { - if (scs[i].peer_conn.connection) { - ngx_close_connection(scs[i].peer_conn.connection); - scs[i].peer_conn.connection = NULL; - } - } -} - static void empty_handler(ngx_event_t *ev) { ngx_log_debug(NGX_LOG_DEBUG_ALL, ev->log, 0, @@ -833,6 +811,7 @@ get_one_line(u_char *buf, u_char *buf_end, ngx_str_t *line) { static ngx_array_t * get_servers(ngx_pool_t *pool, ngx_str_t *body, ngx_log_t *log) { ngx_int_t ret = -1; + // this is the pool that needs to be cleared ngx_array_t *servers = ngx_array_create(pool, 2, sizeof(ngx_http_upstream_server_t)); ngx_http_upstream_server_t *server = NULL; @@ -871,6 +850,7 @@ get_servers(ngx_pool_t *pool, ngx_str_t *body, ngx_log_t *log) { break; } + // this causes the memory leak when servers are never removed server = ngx_array_push(servers); ngx_memzero(server, sizeof *server); server->name = u.url; @@ -1091,14 +1071,77 @@ refresh_upstream(serverlist *sl, ngx_str_t *body, ngx_log_t *log) { main_conf *mcf = ngx_http_cycle_get_module_main_conf(ngx_cycle, ngx_http_upstream_serverlist_module); ngx_http_upstream_srv_conf_t *uscf = sl->upstream_conf; - ngx_http_upstream_init_pt init = NULL; ngx_conf_t cf = {0}; - ngx_array_t *new_servers = NULL; - ngx_array_t *old_servers = uscf->servers; + ngx_array_t *new_servers = NULL; + + // create new temp main_conf with a new pools, new service_conns and new serverlists, copy info from existing conf except for the pools, service_conns and serverlists + cf.pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, log); + + ngx_http_conf_ctx_t *ctx = NULL; + ctx = ngx_pcalloc(cf.pool, sizeof(ngx_http_conf_ctx_t)); + if (ctx == NULL) { + return -1; + } + ctx = mcf->conf_ctx; + cf.ctx = ctx; - // use mcf->pool, avoid coredump, will lead mem leak - // new_servers = get_servers(sl->new_pool, body, log); - new_servers = get_servers(mcf->conf_pool, body, log); + main_conf *tmp_mcf = create_main_conf(&cf); + + //copy over previous count + tmp_mcf->conf_pool_count = mcf->conf_pool_count; + + + // create new server list + serverlist *new_sl = NULL; + new_sl = ngx_array_push(&tmp_mcf->serverlists); + if (new_sl == NULL) { + ngx_log_error(NGX_LOG_ERR, log, 0, + "upstream-serverlist: temp serverlists conf %V failed", uscf->host); + return -1; + } + ngx_memzero(new_sl, sizeof *new_sl); + new_sl->upstream_conf = uscf; + new_sl->last_modified = -1; + new_sl->name = uscf->host; + + tmp_mcf->service_concurrency = mcf->service_concurrency; + tmp_mcf->service_url = mcf->service_url; + tmp_mcf->conf_dump_dir = mcf->conf_dump_dir; + + //create new conns + service_conn *new_sc = NULL; + new_sc = ngx_array_push(&tmp_mcf->service_conns); + ngx_memzero(new_sc, sizeof *new_sc); + new_sc->send.start = ngx_pcalloc(tmp_mcf->conf_pool, MAX_HTTP_REQUEST_SIZE); + if (new_sc->send.start == NULL) { + ngx_log_error(NGX_LOG_ERR, log, 0, + "upstream-serverlist: new allocate send buffer failed"); + return NGX_ERROR; + } + new_sc->send.end = new_sc->send.start + MAX_HTTP_REQUEST_SIZE; + new_sc->send.last = new_sc->send.pos = new_sc->send.start; + + new_sc->recv.start = ngx_pcalloc(tmp_mcf->conf_pool, ngx_pagesize); + if (new_sc->recv.start == NULL) { + ngx_log_error(NGX_LOG_ERR, log, 0, + "upstream-serverlist: new allocate recv buffer failed"); + return NGX_ERROR; + } + new_sc->recv.end = new_sc->recv.start + MAX_HTTP_REQUEST_SIZE; + new_sc->recv.last = new_sc->recv.pos = new_sc->recv.start; + + ngx_memzero(&new_sc->peer_conn, sizeof new_sc->peer_conn); + new_sc->peer_conn.data = NULL; + new_sc->peer_conn.log = log; + new_sc->peer_conn.log_error = NGX_ERROR_ERR; + new_sc->peer_conn.connection = NULL; + new_sc->peer_conn.get = ngx_event_get_peer; + new_sc->peer_conn.name = &tmp_mcf->service_url.host; + new_sc->peer_conn.sockaddr = &tmp_mcf->service_url.sockaddr.sockaddr; + new_sc->peer_conn.socklen = tmp_mcf->service_url.socklen; + + //new_servers = get_servers(mcf->conf_pool, body, log); + new_servers = get_servers(tmp_mcf->conf_pool, body, log); if (new_servers == NULL || new_servers->nelts <= 0) { ngx_log_error(NGX_LOG_ERR, log, 0, "upstream-serverlist: parse serverlist %V failed", &sl->name); @@ -1106,6 +1149,11 @@ refresh_upstream(serverlist *sl, ngx_str_t *body, ngx_log_t *log) { } if (!upstream_servers_changed(uscf->servers, new_servers)) { + if (tmp_mcf->conf_pool != NULL) { + // destry temp pool + ngx_destroy_pool(tmp_mcf->conf_pool); + tmp_mcf->conf_pool = NULL; + } ngx_log_debug(NGX_LOG_INFO, log, 0, "upstream-serverlist: serverlist %V nothing changed",&sl->name); // once return -1, everything in the old pool will kept and the new pool @@ -1113,29 +1161,64 @@ refresh_upstream(serverlist *sl, ngx_str_t *body, ngx_log_t *log) { return -1; } - init = uscf->peer.init_upstream ? uscf->peer.init_upstream - : ngx_http_upstream_init_round_robin; ngx_memzero(&cf, sizeof cf); cf.name = "serverlist_init_upstream"; cf.cycle = (ngx_cycle_t *) ngx_cycle; - // cf.pool = sl->new_pool; - cf.pool = mcf->conf_pool; + + + cf.pool = tmp_mcf->conf_pool; cf.module_type = NGX_HTTP_MODULE; cf.cmd_type = NGX_HTTP_MAIN_CONF; cf.log = ngx_cycle->log; - cf.ctx = mcf->conf_ctx; + cf.ctx = tmp_mcf->conf_ctx; - old_servers = uscf->servers; + ngx_array_t *old_servers = uscf->servers; uscf->servers = new_servers; - if (init(&cf, uscf) != NGX_OK) { + ngx_array_t *old_service_conns = &mcf->service_conns; + ngx_array_t *old_serverlists = &mcf->serverlists; + + ngx_uint_t blocksize = 0; + if (tmp_mcf->serverlists.nelts >= tmp_mcf->service_concurrency) { + blocksize = (tmp_mcf->serverlists.nelts + (tmp_mcf->service_concurrency - 1)) + / tmp_mcf->service_concurrency; + } else { + blocksize = 1; + } + + new_sc->serverlists_start = ngx_min(tmp_mcf->serverlists.nelts, + 0 + blocksize); + new_sc->serverlists_end = ngx_min(tmp_mcf->serverlists.nelts, + new_sc->serverlists_start + blocksize); + new_sc->serverlists_curr = new_sc->serverlists_start; + + for (ngx_uint_t i = 0; i < tmp_mcf->service_conns.nelts; i++) { + service_conn *tmp_sc = (service_conn *)tmp_mcf->service_conns.elts + i; + tmp_sc->timeout_timer.handler = refresh_timeout_handler; + tmp_sc->timeout_timer.log = log; + tmp_sc->timeout_timer.data = tmp_sc; + tmp_sc->refresh_timer.handler = connect_to_service; + tmp_sc->refresh_timer.log = log; + tmp_sc->refresh_timer.data = tmp_sc; + if ((ngx_uint_t)tmp_sc->serverlists_start < tmp_mcf->serverlists.nelts) { + ngx_add_timer(&tmp_sc->refresh_timer, random_interval_ms()); + } + } + + + if (ngx_http_upstream_init_round_robin(&cf, uscf) != NGX_OK) { + // see: https://github.com/GUI/nginx-upstream-dynamic-servers/pull/33/files + /* if you read the native code you can find out that all you need to do here is ngx_http_upstream_init_round_robin if you don't use other third party modules in the init process, + otherwise it may cause memory problem if you use keepalive in the upstream block (it reinitialize the keepalive queue, when remote close the connection 2 TTL later, it will crash) + */ ngx_log_error(NGX_LOG_ERR, log, 0, "upstream-serverlist: refresh upstream %V failed, rollback it", &uscf->host); // cf.pool = sl->pool; uscf->servers = old_servers; - init(&cf, uscf); + // this may not work if old servers do not exist? + ngx_http_upstream_init_round_robin(&cf, uscf); return -1; } @@ -1148,7 +1231,68 @@ refresh_upstream(serverlist *sl, ngx_str_t *body, ngx_log_t *log) { } #endif - dump_serverlist(sl); + ngx_shm_t shm = {0}; + shm.size = CACHE_LINE_SIZE * tmp_mcf->serverlists.nelts; + shm.log = log; + ngx_str_set(&shm.name, "upstream-serverlist-shared-zone"); + if (ngx_shm_alloc(&shm) != NGX_OK) { + return -1; + } + for (ngx_uint_t i = 0; i < tmp_mcf->serverlists.nelts; i++) { + serverlist *temp_sl = (serverlist *)tmp_mcf->serverlists.elts + i; + ngx_int_t ret = ngx_shmtx_create(&temp_sl->dump_file_lock, + (ngx_shmtx_sh_t *)(shm.addr + CACHE_LINE_SIZE * i), NULL); + if ( ret != NGX_OK) { + return -1; + } + } + + dump_serverlist(new_sl); + + serverlist *old_sls = mcf->serverlists.elts; + + for (ngx_uint_t i = 0; i < mcf->serverlists.nelts; i++) { + + if (old_sls[i].pool) { + ngx_destroy_pool(old_sls[i].pool); + old_sls[i].pool = NULL; + } + + if (old_sls[i].new_pool) { + ngx_destroy_pool(old_sls[i].new_pool); + old_sls[i].new_pool = NULL; + } + } + + if (old_servers != NULL) { + // destry old old_servers + ngx_array_destroy(old_servers); + old_servers = NULL; + } + + if (old_service_conns != NULL) { + // destroy old conns + ngx_array_destroy(old_service_conns); + old_service_conns = NULL; + } + + if (old_serverlists != NULL) { + // destroy old old_serverlists + ngx_array_destroy(old_serverlists); + old_serverlists = NULL; + } + + if (tmp_mcf->conf_pool_count > 0){ + //destry previous pool + if (tmp_mcf->prev_conf_pool != NULL) { + // destry old conf_pool + ngx_destroy_pool(tmp_mcf->prev_conf_pool); + tmp_mcf->prev_conf_pool = NULL; + } + } + + tmp_mcf->prev_conf_pool = mcf->conf_pool; + tmp_mcf->conf_pool_count++; return 0; } @@ -1420,8 +1564,13 @@ recv_from_service(ngx_event_t *ev) { // the pool is NULL at first run. ngx_destroy_pool(sl->pool); } + sl->pool = sl->new_pool; - sl->new_pool = NULL; + + if (sl->new_pool != NULL) { + ngx_destroy_pool(sl->new_pool); + sl->new_pool = NULL; + } exit: if (sc->serverlists_curr + 1 >= sc->serverlists_end) {