working version with read activity checked on select output rather than on each connection object

This commit is contained in:
yrutschle 2021-03-29 21:56:21 +02:00
parent f0e1aaf82c
commit 40557c58ad
3 changed files with 65 additions and 31 deletions

View File

@ -31,7 +31,7 @@ struct cnx_collection {
struct connection *cnx; /* pointer to array of connections */
int num_fd; /* Number of file descriptors */
int* fd2cnx; /* Array indexed by file descriptor to index in cnx[] */
struct connection** fd2cnx; /* Array indexed by file descriptor to things in cnx[] */
/* We don't try to keep the size of cnx and fd2cnx in sync at all,
* so that the implementation is independant of other uses for file
* descriptors, e.g. if sslh get integrated in another process */
@ -77,7 +77,7 @@ cnx_collection* collection_init(void)
CHECK_ALLOC(collection->fd2cnx, "malloc(collection->fd2cnx)");
for (i = 0; i < collection->num_fd; i++) {
collection->fd2cnx[i] = -1;
collection->fd2cnx[i] = NULL;
}
return collection;
@ -112,20 +112,20 @@ static int extend_cnx(struct cnx_collection* collection)
static int extend_fd2cnx(cnx_collection* collection)
{
int* new_i;
struct connection** new_array;
int i, new_length;
new_length = collection->num_fd + fd_num_alloc;
new_i = realloc(collection->fd2cnx, new_length * sizeof(*new_i));
if (!new_i) {
new_array = realloc(collection->fd2cnx, new_length * sizeof(*new_array));
if (!new_array) {
return -1;
}
collection->fd2cnx = new_i;
collection->fd2cnx = new_array;
for (i = collection->num_fd; i < new_length; i++) {
collection->fd2cnx[i] = -1;
collection->fd2cnx[i] = NULL;
}
collection->num_fd = new_length;
@ -133,7 +133,7 @@ static int extend_fd2cnx(cnx_collection* collection)
}
/* Points the file descriptor to the specified connection index */
int collection_add_fd(cnx_collection* collection, int fd, int cnx_index)
int collection_add_fd(cnx_collection* collection, struct connection* cnx, int fd)
{
if (fd > collection->num_fd) {
int res = extend_fd2cnx(collection);
@ -142,7 +142,9 @@ int collection_add_fd(cnx_collection* collection, int fd, int cnx_index)
return -1;
}
}
collection->fd2cnx[fd] = cnx_index;
collection->fd2cnx[fd] = cnx;
int cnx_index = cnx - collection->cnx;
if(cfg.verbose) fprintf(stderr, "added fd %d on slot %d\n", fd, cnx_index);
return 0;
}
@ -153,8 +155,6 @@ int collection_alloc_cnx_from_fd(struct cnx_collection* collection, int fd)
int free, res;
struct connection* cnx = collection->cnx;
if (cfg.verbose) fprintf(stderr, "collection_add_fd %d\n", fd);
/* Find an empty slot */
for (free = 0; (free < collection->num_cnx) && (cnx[free].q[0].fd != -1); free++) {
/* nothing */
@ -170,7 +170,7 @@ int collection_alloc_cnx_from_fd(struct cnx_collection* collection, int fd)
collection->cnx[free].state = ST_PROBING;
collection->cnx[free].probe_timeout = time(NULL) + cfg.timeout;
collection_add_fd(collection, fd, free);
collection_add_fd(collection, &collection->cnx[free], fd);
if (cfg.verbose)
fprintf(stderr, "accepted fd %d on slot %d\n", fd, free);
@ -180,18 +180,24 @@ int collection_alloc_cnx_from_fd(struct cnx_collection* collection, int fd)
/* Remove a connection from the collection */
int collection_remove_cnx(cnx_collection* collection, struct connection *cnx)
{
collection->fd2cnx[cnx->q[0].fd] = -1;
collection->fd2cnx[cnx->q[1].fd] = -1;
collection->fd2cnx[cnx->q[0].fd] = NULL;
collection->fd2cnx[cnx->q[1].fd] = NULL;
init_cnx(cnx);
return 0;
}
/* Returns the indexed connection in the collection */
struct connection* collection_get_cnx(struct cnx_collection* collection, int index)
struct connection* collection_get_cnx_from_index(struct cnx_collection* collection, int index)
{
return & collection->cnx[index];
}
/* Returns the connection that contains the file descriptor */
struct connection* collection_get_cnx_from_fd(struct cnx_collection* collection, int fd)
{
return collection->fd2cnx[fd];
}
/* Returns the number of connections in the collection */
int collection_get_length(cnx_collection* collection)
{

View File

@ -8,12 +8,13 @@ cnx_collection* collection_init(void);
void collection_destroy(cnx_collection* collection);
int collection_alloc_cnx_from_fd(cnx_collection* collection, int fd);
int collection_add_fd(cnx_collection* collection, int fd, int cnx_index);
int collection_add_fd(cnx_collection* collection, struct connection* cnx, int fd);
/* Remove a connection from the collection */
int collection_remove_cnx(cnx_collection* collection, struct connection *cnx);
struct connection* collection_get_cnx(cnx_collection* collection, int index);
struct connection* collection_get_cnx_from_index(cnx_collection* collection, int index);
struct connection* collection_get_cnx_from_fd(struct cnx_collection* collection, int fd);
/* Returns the number of connections in the collection */
int collection_get_length(cnx_collection* collection);

View File

@ -94,6 +94,9 @@ static int accept_new_connection(int listen_socket, struct cnx_collection *colle
{
int in_socket, res;
if (cfg.verbose) fprintf(stderr, "accepting from %d\n", listen_socket);
in_socket = accept(listen_socket, 0, 0);
CHECK_RES_RETURN(in_socket, "accept", -1);
@ -119,9 +122,10 @@ static int accept_new_connection(int listen_socket, struct cnx_collection *colle
/* Connect queue 1 of connection to SSL; returns new file descriptor */
static int connect_queue(cnx_collection* collection, int cnx_index, struct select_info* fd_info)
static int connect_queue(cnx_collection* collection,
struct connection* cnx,
struct select_info* fd_info)
{
struct connection *cnx = collection_get_cnx(collection, cnx_index);
struct queue *q = &cnx->q[1];
q->fd = connect_addr(cnx, cnx->q[0].fd);
@ -134,7 +138,7 @@ static int connect_queue(cnx_collection* collection, int cnx_index, struct selec
FD_CLR(cnx->q[0].fd, &fd_info->fds_r);
}
FD_SET(q->fd, &fd_info->fds_r);
collection_add_fd(collection, q->fd, cnx_index);
collection_add_fd(collection, cnx, q->fd);
return q->fd;
} else {
tidy_connection(cnx, fd_info);
@ -270,9 +274,10 @@ static int is_fd_active(int fd, fd_set* set)
* IN/OUT cnx: connection data, updated if connected
* IN/OUT info: updated if connected
* */
static void probing_read_process(cnx_collection* collection, int cnx_index, struct select_info* fd_info)
static void probing_read_process(cnx_collection* collection,
struct connection* cnx,
struct select_info* fd_info)
{
struct connection* cnx = collection_get_cnx(collection, cnx_index);
int res;
/* If timed out it's SSH, otherwise the client sent
@ -312,7 +317,7 @@ static void probing_read_process(cnx_collection* collection, int cnx_index, stru
tidy_connection(cnx, fd_info);
res = -1;
} else {
res = connect_queue(collection, cnx_index, fd_info);
res = connect_queue(collection, cnx, fd_info);
}
if (res >= fd_info->max_fd)
@ -321,9 +326,14 @@ static void probing_read_process(cnx_collection* collection, int cnx_index, stru
/* Process a connection that is active in read */
static void cnx_read_process(cnx_collection* collection, int cnx_index, int active_q, struct select_info* fd_info)
static void cnx_read_process(cnx_collection* collection,
struct connection* cnx,
int fd,
struct select_info* fd_info)
{
struct connection* cnx = collection_get_cnx(collection, cnx_index);
/* Determine active queue (0 or 1): if fd is that of q[1], active_q = 1,
* otherwise it's 0 */
int active_q = (cnx->q[1].fd == fd);
switch (cnx->state) {
@ -334,7 +344,7 @@ static void cnx_read_process(cnx_collection* collection, int cnx_index, int act
exit(1);
}
probing_read_process(collection, cnx_index, fd_info);
probing_read_process(collection, cnx, fd_info);
break;
@ -411,12 +421,14 @@ void main_loop(struct listen_endpoint listen_sockets[], int num_addr_listen)
if (in_socket >= fd_info.max_fd)
fd_info.max_fd = in_socket + 1;;
}
/* don't also process it as a read socket */
FD_CLR(listen_sockets[i].socketfd, &readfds);
}
}
/* Check all sockets for write activity */
for (i = 0; i < collection_get_length(fd_info.collection); i++) {
struct connection* cnx = collection_get_cnx(fd_info.collection, i);
struct connection* cnx = collection_get_cnx_from_index(fd_info.collection, i);
if (cnx->q[0].fd != -1) {
for (j = 0; j < 2; j++) {
if (is_fd_active(cnx->q[j].fd, &writefds)) {
@ -439,18 +451,33 @@ void main_loop(struct listen_endpoint listen_sockets[], int num_addr_listen)
}
}
/* Check all sockets for read activity */
/* Check all sockets for timeouts */
for (i = 0; i < collection_get_length(fd_info.collection); i++) {
struct connection* cnx = collection_get_cnx(fd_info.collection, i);
struct connection* cnx = collection_get_cnx_from_index(fd_info.collection, i);
for (j = 0; j < 2; j++) {
if (is_fd_active(cnx->q[j].fd, &readfds) ||
if (/*is_fd_active(cnx->q[j].fd, &readfds) || */
((cnx->state == ST_PROBING) && (cnx->probe_timeout < time(NULL)))) {
if (cfg.verbose)
fprintf(stderr, "processing read fd%d slot %d\n", j, i);
cnx_read_process(fd_info.collection, i, j, &fd_info);
cnx_read_process(fd_info.collection, cnx, i, &fd_info);
}
}
}
/* Check all sockets for read activity */
for (i = 0; i < fd_info.max_fd; i++) {
fprintf(stderr, "checking fd %d for read activity\n", i);
if (FD_ISSET(i, &readfds)) {
struct connection* cnx = collection_get_cnx_from_fd(fd_info.collection, i);
if (cfg.verbose) {
fprintf(stderr, "read activity on fd %d; cnx:\n", i);
dump_connection(cnx);
}
cnx_read_process(fd_info.collection, cnx, i, &fd_info);
}
}
}
}