refactor: abstract connection list management to a new 'collection' type

This commit is contained in:
yrutschle 2021-03-03 14:38:24 +01:00
parent 49d4080afd
commit 7e63dedca3

View File

@ -36,6 +36,12 @@ struct select_info {
fd_set fds_r, fds_w; /* reference fd sets (used to init working copies) */ fd_set fds_r, fds_w; /* reference fd sets (used to init working copies) */
}; };
/* Info to keep track of all connections */
struct cnx_collection {
int num_cnx; /* Number of connections in *cnx */
struct connection *cnx; /* pointer to array of connections */
};
/* cnx_num_alloc is the number of connection to allocate at once (at start-up, /* cnx_num_alloc is the number of connection to allocate at once (at start-up,
* and then every time we get too many simultaneous connections: e.g. start * and then every time we get too many simultaneous connections: e.g. start
* with 100 slots, then if we get more than 100 connections allocate another * with 100 slots, then if we get more than 100 connections allocate another
@ -45,6 +51,46 @@ struct select_info {
*/ */
static long cnx_num_alloc; static long cnx_num_alloc;
static void init_collection(struct cnx_collection* collection)
{
int i;
memset(collection, 0, sizeof(*collection));
cnx_num_alloc = getpagesize() / sizeof(struct connection);
collection->num_cnx = cnx_num_alloc; /* Start with a set pool of slots */
collection->cnx = malloc(collection->num_cnx * sizeof(struct connection));
CHECK_ALLOC(collection->cnx, "malloc");
for (i = 0; i < collection->num_cnx; i++) {
init_cnx(&collection->cnx[i]);
}
}
/* Increases the number of slots available in a collection of connections
* After calling, collection->cnx might have moved
* */
static int extend_collection(struct cnx_collection* collection)
{
struct connection* new;
int i, new_length = collection->num_cnx + cnx_num_alloc;
if (cfg.verbose)
fprintf(stderr, "allocating %ld more slots.\n", cnx_num_alloc);
new = realloc(&collection->cnx, new_length * sizeof(collection->cnx[0]));
if (!new) return -1;
collection->cnx = new;
for (i = collection->num_cnx; i < new_length; i++) {
init_cnx(&collection->cnx[i]);
}
collection->num_cnx = new_length;
return 0;
}
/* Make the file descriptor non-block */ /* Make the file descriptor non-block */
static int set_nonblock(int fd) static int set_nonblock(int fd)
{ {
@ -94,10 +140,10 @@ static int fd_is_in_range(int fd) {
/* Accepts a connection from the main socket and assigns it to an empty slot. /* Accepts a connection from the main socket and assigns it to an empty slot.
* If no slots are available, allocate another few. If that fails, drop the * If no slots are available, allocate another few. If that fails, drop the
* connexion */ * connexion */
static int accept_new_connection(int listen_socket, struct connection *cnx[], int* cnx_size) static int accept_new_connection(int listen_socket, struct cnx_collection *collection)
{ {
int in_socket, free, i, res; int in_socket, free, res;
struct connection *new; struct connection* cnx = collection->cnx;
in_socket = accept(listen_socket, 0, 0); in_socket = accept(listen_socket, 0, 0);
CHECK_RES_RETURN(in_socket, "accept", -1); CHECK_RES_RETURN(in_socket, "accept", -1);
@ -114,27 +160,20 @@ static int accept_new_connection(int listen_socket, struct connection *cnx[], in
} }
/* Find an empty slot */ /* Find an empty slot */
for (free = 0; (free < *cnx_size) && ((*cnx)[free].q[0].fd != -1); free++) { for (free = 0; (free < collection->num_cnx) && (cnx[free].q[0].fd != -1); free++) {
/* nothing */ /* nothing */
} }
if (free >= *cnx_size) { if (free >= collection->num_cnx) {
if (cfg.verbose) res = extend_collection(collection);
fprintf(stderr, "buying more slots from the slot machine.\n"); if (!res) {
new = realloc(*cnx, (*cnx_size + cnx_num_alloc) * sizeof((*cnx)[0])); log_message(LOG_ERR, "unable to extend collection -- dropping connection\n");
if (!new) {
log_message(LOG_ERR, "unable to realloc -- dropping connection\n");
close(in_socket); close(in_socket);
return -1; return -1;
} }
*cnx = new;
*cnx_size += cnx_num_alloc;
for (i = free; i < *cnx_size; i++) {
init_cnx(&(*cnx)[i]);
} }
} collection->cnx[free].q[0].fd = in_socket;
(*cnx)[free].q[0].fd = in_socket; collection->cnx[free].state = ST_PROBING;
(*cnx)[free].state = ST_PROBING; collection->cnx[free].probe_timeout = time(NULL) + cfg.timeout;
(*cnx)[free].probe_timeout = time(NULL) + cfg.timeout;
if (cfg.verbose) if (cfg.verbose)
fprintf(stderr, "accepted fd %d on slot %d\n", in_socket, free); fprintf(stderr, "accepted fd %d on slot %d\n", in_socket, free);
@ -324,7 +363,7 @@ static void probing_read_process(struct connection* cnx, struct select_info* fd_
switch (fork()) { switch (fork()) {
case 0: /* child */ case 0: /* child */
/* TODO: close all file descriptors except 2 */ /* TODO: close all file descriptors except 2 */
free(cnx); /* free(cnx); */
connect_proxy(cnx); connect_proxy(cnx);
exit(0); exit(0);
case -1: log_message(LOG_ERR, "fork failed: err %d: %s\n", errno, strerror(errno)); case -1: log_message(LOG_ERR, "fork failed: err %d: %s\n", errno, strerror(errno));
@ -392,8 +431,7 @@ void main_loop(struct listen_endpoint listen_sockets[], int num_addr_listen)
struct timeval tv; struct timeval tv;
int i, j, res; int i, j, res;
int in_socket = 0; int in_socket = 0;
struct connection *cnx; struct cnx_collection collection;
int num_cnx; /* Number of connections in *cnx */
fd_info.num_probing = 0; fd_info.num_probing = 0;
FD_ZERO(&fd_info.fds_r); FD_ZERO(&fd_info.fds_r);
@ -405,13 +443,7 @@ void main_loop(struct listen_endpoint listen_sockets[], int num_addr_listen)
} }
fd_info.max_fd = listen_sockets[num_addr_listen-1].socketfd + 1; fd_info.max_fd = listen_sockets[num_addr_listen-1].socketfd + 1;
cnx_num_alloc = getpagesize() / sizeof(struct connection); init_collection(&collection);
num_cnx = cnx_num_alloc; /* Start with a set pool of slots */
cnx = malloc(num_cnx * sizeof(struct connection));
CHECK_ALLOC(cnx, "malloc");
for (i = 0; i < num_cnx; i++)
init_cnx(&cnx[i]);
while (1) while (1)
{ {
@ -432,7 +464,7 @@ void main_loop(struct listen_endpoint listen_sockets[], int num_addr_listen)
/* Check main socket for new connections */ /* Check main socket for new connections */
for (i = 0; i < num_addr_listen; i++) { for (i = 0; i < num_addr_listen; i++) {
if (FD_ISSET(listen_sockets[i].socketfd, &readfds)) { if (FD_ISSET(listen_sockets[i].socketfd, &readfds)) {
in_socket = accept_new_connection(listen_sockets[i].socketfd, &cnx, &num_cnx); in_socket = accept_new_connection(listen_sockets[i].socketfd, &collection);
if (in_socket > 0) { if (in_socket > 0) {
fd_info.num_probing++; fd_info.num_probing++;
FD_SET(in_socket, &fd_info.fds_r); FD_SET(in_socket, &fd_info.fds_r);
@ -443,7 +475,8 @@ void main_loop(struct listen_endpoint listen_sockets[], int num_addr_listen)
} }
/* Check all sockets for write activity */ /* Check all sockets for write activity */
for (i = 0; i < num_cnx; i++) { struct connection* cnx = collection.cnx;
for (i = 0; i < collection.num_cnx; i++) {
if (cnx[i].q[0].fd != -1) { if (cnx[i].q[0].fd != -1) {
for (j = 0; j < 2; j++) { for (j = 0; j < 2; j++) {
if (is_fd_active(cnx[i].q[j].fd, &writefds)) { if (is_fd_active(cnx[i].q[j].fd, &writefds)) {
@ -467,7 +500,7 @@ void main_loop(struct listen_endpoint listen_sockets[], int num_addr_listen)
} }
/* Check all sockets for read activity */ /* Check all sockets for read activity */
for (i = 0; i < num_cnx; i++) { for (i = 0; i < collection.num_cnx; i++) {
for (j = 0; j < 2; j++) { for (j = 0; j < 2; j++) {
if (is_fd_active(cnx[i].q[j].fd, &readfds) || if (is_fd_active(cnx[i].q[j].fd, &readfds) ||
((cnx[i].state == ST_PROBING) && (cnx[i].probe_timeout < time(NULL)))) { ((cnx[i].state == ST_PROBING) && (cnx[i].probe_timeout < time(NULL)))) {