New sslh-ev: this is functionaly equivalent to sslh-select (mono-process, only forks for specified protocols), but based on libev, which should make it scalable to large numbers of connections.

This commit is contained in:
yrutschle 2021-11-12 09:05:18 +01:00
commit 711c11c820
11 changed files with 754 additions and 466 deletions

View File

@ -1,4 +1,9 @@
vNEXT: vNEXT:
New sslh-ev: this is functionaly equivalent to
sslh-select (mono-process, only forks for specified
protocols), but based on libev, which should make it
scalable to large numbers of connections.
New log system: instead of --verbose with arbitrary New log system: instead of --verbose with arbitrary
levels, there are now several message classes. Each levels, there are now several message classes. Each
message class can be set to go to stderr, syslog, or message class can be set to go to stderr, syslog, or

View File

@ -27,7 +27,10 @@ CC ?= gcc
CFLAGS ?=-Wall -DLIBPCRE -g $(CFLAGS_COV) CFLAGS ?=-Wall -DLIBPCRE -g $(CFLAGS_COV)
LIBS=-lm -lpcre2-8 LIBS=-lm -lpcre2-8
OBJS=sslh-conf.o common.o log.o sslh-main.o probe.o tls.o argtable3.o udp-listener.o collection.o gap.o OBJS=sslh-conf.o common.o log.o sslh-main.o probe.o tls.o argtable3.o collection.o gap.o
FORK_OBJS=sslh-fork.o $(OBJS)
SELECT_OBJS=sslh-select.o $(OBJS) processes.o udp-listener.o
EV_OBJS=sslh-ev.o $(OBJS) processes.o udp-listener.o
CONDITIONAL_TARGETS= CONDITIONAL_TARGETS=
@ -70,21 +73,25 @@ all: sslh $(MAN) echosrv $(CONDITIONAL_TARGETS)
version.h: version.h:
./genver.sh >version.h ./genver.sh >version.h
sslh: sslh-fork sslh-select sslh: sslh-fork sslh-select sslh-ev
$(OBJS): version.h common.h collection.h sslh-conf.h gap.h $(OBJS): version.h common.h collection.h sslh-conf.h gap.h
sslh-conf.c sslh-conf.h: sslhconf.cfg sslh-conf.c sslh-conf.h: sslhconf.cfg
conf2struct sslhconf.cfg conf2struct sslhconf.cfg
sslh-fork: version.h $(OBJS) sslh-fork.o Makefile sslh-fork: version.h Makefile $(FORK_OBJS)
$(CC) $(CFLAGS) $(LDFLAGS) -o sslh-fork sslh-fork.o $(OBJS) $(LIBS) $(CC) $(CFLAGS) $(LDFLAGS) -o sslh-fork $(FORK_OBJS) $(LIBS)
#strip sslh-fork #strip sslh-fork
sslh-select: version.h $(OBJS) sslh-select.o Makefile sslh-select: version.h $(SELECT_OBJS) Makefile
$(CC) $(CFLAGS) $(LDFLAGS) -o sslh-select sslh-select.o $(OBJS) $(LIBS) $(CC) $(CFLAGS) $(LDFLAGS) -o sslh-select $(SELECT_OBJS) $(LIBS)
#strip sslh-select #strip sslh-select
sslh-ev: version.h $(EV_OBJS) Makefile
$(CC) $(CFLAGS) $(LDFLAGS) -o sslh-ev $(EV_OBJS) $(LIBS) -lev
#strip sslh-ev
systemd-sslh-generator: systemd-sslh-generator.o systemd-sslh-generator: systemd-sslh-generator.o
$(CC) $(CFLAGS) $(LDFLAGS) -o systemd-sslh-generator systemd-sslh-generator.o -lconfig $(CC) $(CFLAGS) $(LDFLAGS) -o systemd-sslh-generator systemd-sslh-generator.o -lconfig

View File

@ -424,6 +424,7 @@ void init_cnx(struct connection *cnx)
void dump_connection(struct connection *cnx) void dump_connection(struct connection *cnx)
{ {
print_message(msg_int_error, "type: %s\n", cnx->type == SOCK_DGRAM ? "UDP" : "TCP");
print_message(msg_int_error, "state: %d\n", cnx->state); print_message(msg_int_error, "state: %d\n", cnx->state);
print_message(msg_int_error, "0: fd %d, %d deferred\n", cnx->q[0].fd, cnx->q[0].deferred_data_size); print_message(msg_int_error, "0: fd %d, %d deferred\n", cnx->q[0].fd, cnx->q[0].deferred_data_size);
hexdump(msg_int_error, cnx->q[0].deferred_data, cnx->q[0].deferred_data_size); hexdump(msg_int_error, cnx->q[0].deferred_data, cnx->q[0].deferred_data_size);

View File

@ -112,6 +112,7 @@ void tcp_echo(struct listen_endpoint* listen_socket)
exit(0); exit(0);
} }
close(in_socket); close(in_socket);
waitpid(-1, NULL, WNOHANG);
} }
} }

2
log.h
View File

@ -1,6 +1,8 @@
#ifndef LOG_H #ifndef LOG_H
#define LOG_H #define LOG_H
#include "common.h"
void setup_syslog(const char* bin_name); void setup_syslog(const char* bin_name);
void log_connection(struct connection_desc* desc, const struct connection *cnx); void log_connection(struct connection_desc* desc, const struct connection *cnx);

403
processes.c Normal file
View File

@ -0,0 +1,403 @@
/*
Processes that are common to sslh-ev and sslh-select
# Copyright (C) 2021 Yves Rutschle
#
# This program is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
# PURPOSE. See the GNU General Public License for more
# details.
#
# The full text for the General Public License is here:
# http://www.gnu.org/licenses/gpl.html
*/
#include "udp-listener.h"
#include "processes.h"
#include "probe.h"
#include "log.h"
/* Removes cnx from probing list */
void remove_probing_cnx(struct loop_info* fd_info, struct connection* cnx)
{
gap_remove_ptr(fd_info->probing_list, cnx, fd_info->num_probing);
fd_info->num_probing--;
}
void add_probing_cnx(struct loop_info* fd_info, struct connection* cnx)
{
gap_set(fd_info->probing_list, fd_info->num_probing, cnx);
fd_info->num_probing++;
}
/* Returns the queue index that contains the specified file descriptor */
static int active_queue(struct connection* cnx, int fd)
{
if (cnx->q[0].fd == fd) return 0;
if (cnx->q[1].fd == fd) return 1;
print_message(msg_int_error, "file descriptor %d not found in connection object\n", fd);
return -1;
}
int tidy_connection(struct connection *cnx, struct loop_info* fd_info)
{
int i;
for (i = 0; i < 2; i++) {
if (cnx->q[i].fd != -1) {
print_message(msg_fd, "closing fd %d\n", cnx->q[i].fd);
watchers_del_read(fd_info->watchers, cnx->q[i].fd);
watchers_del_write(fd_info->watchers, cnx->q[i].fd);
close(cnx->q[i].fd);
if (cnx->q[i].deferred_data)
free(cnx->q[i].deferred_data);
}
}
collection_remove_cnx(fd_info->collection, cnx);
return 0;
}
/* shovels data from active fd to the other
returns after one socket closed or operation would block
*/
static void shovel(struct connection *cnx, int active_fd, struct loop_info* fd_info)
{
struct queue *read_q, *write_q;
read_q = &cnx->q[active_fd];
write_q = &cnx->q[1-active_fd];
print_message(msg_fd, "activity on fd%d\n", read_q->fd);
switch(fd2fd(write_q, read_q)) {
case -1:
case FD_CNXCLOSED:
tidy_connection(cnx, fd_info);
break;
case FD_STALLED:
watchers_add_write(fd_info->watchers, write_q->fd);
watchers_del_read(fd_info->watchers, read_q->fd);
break;
default: /* Nothing */
break;
}
}
/* Process a connection that is active in read */
static void tcp_read_process(struct loop_info* fd_info,
int fd)
{
cnx_collection* collection = fd_info->collection;
struct connection* cnx = collection_get_cnx_from_fd(collection, fd);
/* Determine active queue (0 or 1): if fd is that of q[1], active_q = 1,
* otherwise it's 0 */
int active_q = active_queue(cnx, fd);
switch (cnx->state) {
case ST_PROBING:
if (active_q == 1) {
print_message(msg_int_error, "Activity on fd2 while probing, impossible\n");
dump_connection(cnx);
exit(1);
}
probing_read_process(cnx, fd_info);
break;
case ST_SHOVELING:
shovel(cnx, active_q, fd_info);
break;
default: /* illegal */
print_message(msg_int_error, "Illegal connection state %d\n", cnx->state);
dump_connection(cnx);
exit(1);
}
}
void cnx_read_process(struct loop_info* fd_info, int fd)
{
cnx_collection* collection = fd_info->collection;
struct connection* cnx = collection_get_cnx_from_fd(collection, fd);
switch (cnx->type) {
case SOCK_STREAM:
tcp_read_process(fd_info, fd);
break;
case SOCK_DGRAM:
udp_s2c_forward(cnx);
break;
default:
print_message(msg_int_error, "cnx_read_process: Illegal connection type %d\n", cnx->type);
dump_connection(cnx);
exit(1);
}
}
/* Process a connection that is active in write */
void cnx_write_process(struct loop_info* fd_info, int fd)
{
struct connection* cnx = collection_get_cnx_from_fd(fd_info->collection, fd);
int res;
int queue = active_queue(cnx, fd);
res = flush_deferred(&cnx->q[queue]);
if ((res == -1) && ((errno == EPIPE) || (errno == ECONNRESET))) {
if (cnx->state == ST_PROBING) remove_probing_cnx(fd_info, cnx);
tidy_connection(cnx, fd_info);
} else {
/* If no deferred data is left, stop monitoring the fd
* for write, and restart monitoring the other one for reads*/
if (!cnx->q[queue].deferred_data_size) {
watchers_del_write(fd_info->watchers, cnx->q[queue].fd);
watchers_add_read(fd_info->watchers, cnx->q[1-queue].fd);
}
}
}
/* Accepts a connection from the main socket and assigns it to an empty slot.
* If no slots are available, allocate another few. If that fails, drop the
* connexion */
static struct connection* accept_new_connection(int listen_socket, struct cnx_collection *collection)
{
int in_socket, res;
print_message(msg_fd, "accepting from %d\n", listen_socket);
in_socket = accept(listen_socket, 0, 0);
CHECK_RES_RETURN(in_socket, "accept", NULL);
res = set_nonblock(in_socket);
if (res == -1) {
close(in_socket);
return NULL;
}
struct connection* cnx = collection_alloc_cnx_from_fd(collection, in_socket);
if (!cnx) {
close(in_socket);
return NULL;
}
return cnx;
}
/* Process a connection that accepts a socket
* (For UDP, this means all traffic coming from remote clients)
* Returns new file descriptor, or -1
* */
int cnx_accept_process(struct loop_info* fd_info, struct listen_endpoint* listen_socket)
{
int fd = listen_socket->socketfd;
int type = listen_socket->type;
struct connection* cnx;
int new_fd = -1;
switch (type) {
case SOCK_STREAM:
cnx = accept_new_connection(fd, fd_info->collection);
if (cnx) {
add_probing_cnx(fd_info, cnx);
new_fd = cnx->q[0].fd;
}
break;
case SOCK_DGRAM:
new_fd = udp_c2s_forward(fd, fd_info);
print_message(msg_fd, "new_fd %d\n", new_fd);
if (new_fd == -1)
return -1;
break;
default:
print_message(msg_int_error, "Inconsistent cnx type: %d\n", type);
exit(1);
}
watchers_add_read(fd_info->watchers, new_fd);
return new_fd;
}
/* shovels data from one fd to the other and vice-versa
returns after one socket closed
*/
static void shovel_single(struct connection *cnx)
{
fd_set fds_r, fds_w;
int res, i;
int max_fd = MAX(cnx->q[0].fd, cnx->q[1].fd) + 1;
FD_ZERO(&fds_r);
FD_ZERO(&fds_w);
while (1) {
for (i = 0; i < 2; i++) {
if (cnx->q[i].deferred_data_size) {
FD_SET(cnx->q[i].fd, &fds_w);
FD_CLR(cnx->q[1-i].fd, &fds_r);
} else {
FD_CLR(cnx->q[i].fd, &fds_w);
FD_SET(cnx->q[1-i].fd, &fds_r);
}
}
res = select(
max_fd,
&fds_r,
&fds_w,
NULL,
NULL
);
CHECK_RES_DIE(res, "select");
for (i = 0; i < 2; i++) {
if (FD_ISSET(cnx->q[i].fd, &fds_w)) {
res = flush_deferred(&cnx->q[i]);
if ((res == -1) && ((errno == EPIPE) || (errno == ECONNRESET))) {
print_message(msg_fd, "%s socket closed\n", i ? "server" : "client");
return;
}
}
if (FD_ISSET(cnx->q[i].fd, &fds_r)) {
res = fd2fd(&cnx->q[1-i], &cnx->q[i]);
if (!res) {
print_message(msg_fd, "socket closed\n");
return;
}
}
}
}
}
/* Child process that makes internal connection and proxies
*/
static void connect_proxy(struct connection *cnx)
{
int in_socket;
int out_socket;
/* Minimize the file descriptor value to help select() */
in_socket = dup(cnx->q[0].fd);
if (in_socket == -1) {
in_socket = cnx->q[0].fd;
} else {
close(cnx->q[0].fd);
cnx->q[0].fd = in_socket;
}
/* Connect the target socket */
out_socket = connect_addr(cnx, in_socket, BLOCKING);
CHECK_RES_DIE(out_socket, "connect");
cnx->q[1].fd = out_socket;
log_connection(NULL, cnx);
shovel_single(cnx);
close(in_socket);
close(out_socket);
print_message(msg_fd, "connection closed down\n");
exit(0);
}
/* Connect queue 1 of connection to SSL; returns new file descriptor */
static int connect_queue(struct connection* cnx,
struct loop_info* fd_info)
{
struct queue *q = &cnx->q[1];
q->fd = connect_addr(cnx, cnx->q[0].fd, NON_BLOCKING);
if (q->fd != -1) {
log_connection(NULL, cnx);
flush_deferred(q);
if (q->deferred_data) {
/*
FD_SET(q->fd, &fd_info->watchers->fds_w);
FD_CLR(cnx->q[0].fd, &fd_info->watchers->fds_r); */
watchers_add_write(fd_info->watchers, q->fd);
watchers_del_read(fd_info->watchers, cnx->q[0].fd);
}
/* FD_SET(q->fd, &fd_info->watchers->fds_r); */
watchers_add_read(fd_info->watchers, q->fd);
collection_add_fd(fd_info->collection, cnx, q->fd);
return q->fd;
} else {
tidy_connection(cnx, fd_info);
return -1;
}
}
/* Process read activity on a socket in probe state
* IN/OUT cnx: connection data, updated if connected
* IN/OUT info: updated if connected
* */
void probing_read_process(struct connection* cnx,
struct loop_info* fd_info)
{
int res;
/* If timed out it's SSH, otherwise the client sent
* data so probe the protocol */
if ((cnx->probe_timeout < time(NULL))) {
cnx->proto = timeout_protocol();
print_message(msg_fd, "timed out, connect to %s\n", cnx->proto->name);
} else {
res = probe_client_protocol(cnx);
if (res == PROBE_AGAIN)
return;
}
remove_probing_cnx(fd_info, cnx);
cnx->state = ST_SHOVELING;
/* libwrap check if required for this protocol */
if (cnx->proto->service &&
check_access_rights(cnx->q[0].fd, cnx->proto->service)) {
tidy_connection(cnx, fd_info);
res = -1;
} else if (cnx->proto->fork) {
switch (fork()) {
case 0: /* child */
/* TODO: close all file descriptors except 2 */
/* free(cnx); */
connect_proxy(cnx);
exit(0);
case -1: print_message(msg_system_error, "fork failed: err %d: %s\n", errno, strerror(errno));
break;
default: /* parent */
break;
}
tidy_connection(cnx, fd_info);
res = -1;
} else {
res = connect_queue(cnx, fd_info);
}
}

43
processes.h Normal file
View File

@ -0,0 +1,43 @@
#ifndef PROCESSES_H
#define PROCESSES_H
#include "common.h"
#include "collection.h"
#include "gap.h"
/* Provided by event loop, sslh-ev or sslh-select, for implementation-dependant
* data */
typedef struct watchers watchers;
/* Global state for a loop */
struct loop_info {
int num_probing; /* Number of connections currently probing
* We use this to know if we need to time out of
* select() */
gap_array* probing_list; /* Pointers to cnx that are in probing mode */
watchers* watchers;
cnx_collection* collection; /* Collection of connections linked to this loop */
time_t next_timeout; /* time at which next UDP connection times out */
};
void cnx_read_process(struct loop_info* fd_info, int fd);
void cnx_write_process(struct loop_info* fd_info, int fd);
int cnx_accept_process(struct loop_info* fd_info, struct listen_endpoint* listen_socket);
void probing_read_process(struct connection* cnx, struct loop_info* fd_info);
void remove_probing_cnx(struct loop_info* fd_info, struct connection* cnx);
void add_probing_cnx(struct loop_info* fd_info, struct connection* cnx);
int tidy_connection(struct connection *cnx, struct loop_info* fd_info);
/* These must be declared in the loop handler, sslh-ev or sslh-select */
void watchers_add_read(watchers* w, int fd);
void watchers_del_read(watchers* w, int fd);
void watchers_add_write(watchers* w, int fd);
void watchers_del_write(watchers* w, int fd);
int watchers_maxfd(watchers* w);
#endif

161
sslh-ev.c Normal file
View File

@ -0,0 +1,161 @@
/*
sslh-ev: mono-processus server based on libev
# Copyright (C) 2021 Yves Rutschle
#
# This program is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
# PURPOSE. See the GNU General Public License for more
# details.
#
# The full text for the General Public License is here:
# http://www.gnu.org/licenses/gpl.html
*/
#include <stdlib.h>
#include <ev.h>
#include "processes.h"
#include "gap.h"
#include "log.h"
const char* server_type = "sslh-ev";
static struct ev_loop* loop;
/* Libev watchers */
struct watchers {
/* one set of ev_io for read, one for write, indexed by file descriptor */
gap_array *ev_ior, *ev_iow;
struct listen_endpoint* listen_sockets;
gap_array* fd2ls; /* Array indexed by file descriptor, pointing to listen_sockets */
int max_fd; /* legacy to be removed, still required for UDP */
};
static void cnx_read_cb(EV_P_ ev_io *w, int revents);
static void cnx_write_cb(EV_P_ ev_io *w, int wevents);
static void cnx_accept_cb(EV_P_ ev_io *w, int revents);
static void watchers_init(watchers** w, struct listen_endpoint* listen_sockets,
int num_addr_listen)
{
*w = malloc(sizeof(**w));
(*w)->ev_ior = gap_init(num_addr_listen);
(*w)->ev_iow = gap_init(num_addr_listen);
(*w)->listen_sockets = listen_sockets;
(*w)->fd2ls = gap_init(0);
/* Create watchers for listen sockets */
for (int i = 0; i < num_addr_listen; i++) {
ev_io* io = malloc(sizeof(*io));
ev_io_init(io, &cnx_accept_cb, listen_sockets[i].socketfd, EV_READ);
ev_io_start(EV_A_ io);
gap_set((*w)->ev_ior, i, io);
gap_set((*w)->fd2ls, listen_sockets[i].socketfd, &listen_sockets[i]);
set_nonblock(listen_sockets[i].socketfd);
}
}
void watchers_add_read(watchers* w, int fd)
{
ev_io* io = gap_get(w->ev_ior, fd);
if (!io) {
io = malloc(sizeof(*io));
ev_io_init(io, &cnx_read_cb, fd, EV_READ);
ev_io_set(io, fd, EV_READ);
gap_set(w->ev_ior, fd, io);
}
ev_io_start(loop, io);
if (fd > w->max_fd) w->max_fd = fd + 1;
}
void watchers_del_read(watchers* w, int fd)
{
ev_io* io = gap_get(w->ev_ior, fd);
if (io) ev_io_stop(EV_A_ io);
}
void watchers_add_write(watchers* w, int fd)
{
ev_io* io = gap_get(w->ev_iow, fd);
if (!io) {
io = malloc(sizeof(*io));
ev_io_init(io, &cnx_write_cb, fd, EV_WRITE);
ev_io_set(io, fd, EV_WRITE);
gap_set(w->ev_iow, fd, io);
}
ev_io_start(loop, io);
if (fd > w->max_fd) w->max_fd = fd + 1;
}
void watchers_del_write(watchers* w, int fd)
{
ev_io* io = gap_get(w->ev_iow, fd);
if (io) ev_io_stop(EV_A_ io);
}
/* To remove after moving UDP lookups to hash table */
int watchers_maxfd(watchers* w)
{
return w->max_fd;
}
/* /watchers */
#include "processes.h"
/* Libev callbacks */
static void cnx_read_cb(EV_P_ ev_io *w, int revents)
{
struct loop_info* info = ev_userdata(EV_A);
cnx_read_process(info, w->fd);
}
static void cnx_write_cb(EV_P_ ev_io *w, int wevents)
{
struct loop_info* info = ev_userdata(EV_A);
cnx_write_process(info, w->fd);
}
static void cnx_accept_cb(EV_P_ ev_io *w, int revents)
{
struct loop_info* info = ev_userdata(EV_A);
cnx_accept_process(info, gap_get(info->watchers->fd2ls, w->fd));
}
void main_loop(struct listen_endpoint listen_sockets[], int num_addr_listen)
{
struct loop_info ev_info = {0};
loop = EV_DEFAULT;
ev_info.collection = collection_init(0);
ev_info.probing_list = gap_init(0);
watchers_init(&ev_info.watchers, listen_sockets, num_addr_listen);
ev_set_userdata(EV_A_ &ev_info);
ev_run(EV_A_ 0);
}
void start_shoveler(int listen_socket) {
print_message(msg_config_error, "inetd mode is not supported in libev mode\n");
exit(1);
}

View File

@ -30,57 +30,75 @@
#define __LINUX__ #define __LINUX__
#include <limits.h>
#include "common.h" #include "common.h"
#include "probe.h" #include "probe.h"
#include "udp-listener.h" #include "udp-listener.h"
#include "collection.h" #include "collection.h"
#include "processes.h"
#include "gap.h" #include "gap.h"
#include "log.h" #include "log.h"
const char* server_type = "sslh-select"; const char* server_type = "sslh-select";
/* Global state for a select() loop */ /* watcher type for a select() loop */
struct select_info { struct watchers {
int max_fd; /* Highest fd number to pass to select() */
int num_probing; /* Number of connections currently probing
* We use this to know if we need to time out of
* select() */
gap_array* probing_list; /* Pointers to cnx that are in probing mode */
fd_set fds_r, fds_w; /* reference fd sets (used to init working copies) */ fd_set fds_r, fds_w; /* reference fd sets (used to init working copies) */
cnx_collection* collection; /* Collection of connections linked to this loop */ int max_fd; /* Highest fd number to pass to select() */
time_t next_timeout; /* time at which next UDP connection times out */
}; };
static void watchers_init(watchers** w, struct listen_endpoint* listen_sockets,
static int tidy_connection(struct connection *cnx, struct select_info* fd_info) int num_addr_listen)
{ {
int i; *w = malloc(sizeof(**w));
fd_set* fds = &fd_info->fds_r; FD_ZERO(&(*w)->fds_r);
fd_set* fds2 = &fd_info->fds_w; FD_ZERO(&(*w)->fds_w);
for (i = 0; i < 2; i++) { for (int i = 0; i < num_addr_listen; i++) {
if (cnx->q[i].fd != -1) { watchers_add_read(*w, listen_sockets[i].socketfd);
print_message(msg_fd, "closing fd %d\n", cnx->q[i].fd); set_nonblock(listen_sockets[i].socketfd);
}
}
FD_CLR(cnx->q[i].fd, fds); void watchers_add_read(watchers* w, int fd)
FD_CLR(cnx->q[i].fd, fds2); {
close(cnx->q[i].fd); FD_SET(fd, &w->fds_r);
if (cnx->q[i].deferred_data) if (fd > w->max_fd)
free(cnx->q[i].deferred_data); w->max_fd = fd + 1;
} }
void watchers_del_read(watchers* w, int fd)
{
FD_CLR(fd, &w->fds_r);
} }
collection_remove_cnx(fd_info->collection, cnx);
return 0; void watchers_add_write(watchers* w, int fd)
{
FD_SET(fd, &w->fds_w);
if (fd > w->max_fd)
w->max_fd = fd + 1;
} }
void watchers_del_write(watchers* w, int fd)
{
FD_CLR(fd, &w->fds_w);
}
/* To remove after moving UDP lookups to hash table */
int watchers_maxfd(watchers* w)
{
return w->max_fd;
}
/* /end watchers */
/* if fd becomes higher than FD_SETSIZE, things won't work so well with FD_SET /* if fd becomes higher than FD_SETSIZE, things won't work so well with FD_SET
* and FD_CLR. Need to drop connections if we go above that limit */ * and FD_CLR. Need to drop connections if we go above that limit */
#warning strange things will happen if more than FD_SETSIZE descriptors are used
/* This test is currently not done */
static int fd_is_in_range(int fd) { static int fd_is_in_range(int fd) {
if (fd >= FD_SETSIZE) { if (fd >= FD_SETSIZE) {
print_message(msg_system_error, "too many open file descriptor to monitor them all -- dropping connection\n"); print_message(msg_system_error, "too many open file descriptor to monitor them all -- dropping connection\n");
@ -89,402 +107,10 @@ static int fd_is_in_range(int fd) {
return 1; return 1;
} }
/* Accepts a connection from the main socket and assigns it to an empty slot.
* If no slots are available, allocate another few. If that fails, drop the
* connexion */
static struct connection* accept_new_connection(int listen_socket, struct cnx_collection *collection)
{
int in_socket, res;
print_message(msg_fd, "accepting from %d\n", listen_socket);
in_socket = accept(listen_socket, 0, 0);
CHECK_RES_RETURN(in_socket, "accept", NULL);
if (!fd_is_in_range(in_socket)) {
close(in_socket);
return NULL;
}
res = set_nonblock(in_socket);
if (res == -1) {
close(in_socket);
return NULL;
}
struct connection* cnx = collection_alloc_cnx_from_fd(collection, in_socket);
if (!cnx) {
close(in_socket);
return NULL;
}
return cnx;
}
/* Connect queue 1 of connection to SSL; returns new file descriptor */
static int connect_queue(struct connection* cnx,
struct select_info* fd_info)
{
struct queue *q = &cnx->q[1];
q->fd = connect_addr(cnx, cnx->q[0].fd, NON_BLOCKING);
if ((q->fd != -1) && fd_is_in_range(q->fd)) {
log_connection(NULL, cnx);
flush_deferred(q);
if (q->deferred_data) {
FD_SET(q->fd, &fd_info->fds_w);
FD_CLR(cnx->q[0].fd, &fd_info->fds_r);
}
FD_SET(q->fd, &fd_info->fds_r);
collection_add_fd(fd_info->collection, cnx, q->fd);
return q->fd;
} else {
tidy_connection(cnx, fd_info);
return -1;
}
}
/* shovels data from active fd to the other
returns after one socket closed or operation would block
*/
static void shovel(struct connection *cnx, int active_fd, struct select_info* fd_info)
{
struct queue *read_q, *write_q;
read_q = &cnx->q[active_fd];
write_q = &cnx->q[1-active_fd];
print_message(msg_fd, "activity on fd%d\n", read_q->fd);
switch(fd2fd(write_q, read_q)) {
case -1:
case FD_CNXCLOSED:
tidy_connection(cnx, fd_info);
break;
case FD_STALLED:
FD_SET(write_q->fd, &fd_info->fds_w);
FD_CLR(read_q->fd, &fd_info->fds_r);
break;
default: /* Nothing */
break;
}
}
/* shovels data from one fd to the other and vice-versa
returns after one socket closed
*/
static void shovel_single(struct connection *cnx)
{
fd_set fds_r, fds_w;
int res, i;
int max_fd = MAX(cnx->q[0].fd, cnx->q[1].fd) + 1;
FD_ZERO(&fds_r);
FD_ZERO(&fds_w);
while (1) {
for (i = 0; i < 2; i++) {
if (cnx->q[i].deferred_data_size) {
FD_SET(cnx->q[i].fd, &fds_w);
FD_CLR(cnx->q[1-i].fd, &fds_r);
} else {
FD_CLR(cnx->q[i].fd, &fds_w);
FD_SET(cnx->q[1-i].fd, &fds_r);
}
}
res = select(
max_fd,
&fds_r,
&fds_w,
NULL,
NULL
);
CHECK_RES_DIE(res, "select");
for (i = 0; i < 2; i++) {
if (FD_ISSET(cnx->q[i].fd, &fds_w)) {
res = flush_deferred(&cnx->q[i]);
if ((res == -1) && ((errno == EPIPE) || (errno == ECONNRESET))) {
print_message(msg_fd, "%s socket closed\n", i ? "server" : "client");
return;
}
}
if (FD_ISSET(cnx->q[i].fd, &fds_r)) {
res = fd2fd(&cnx->q[1-i], &cnx->q[i]);
if (!res) {
print_message(msg_fd, "socket closed\n");
return;
}
}
}
}
}
/* Child process that makes internal connection and proxies
*/
static void connect_proxy(struct connection *cnx)
{
int in_socket;
int out_socket;
/* Minimize the file descriptor value to help select() */
in_socket = dup(cnx->q[0].fd);
if (in_socket == -1) {
in_socket = cnx->q[0].fd;
} else {
close(cnx->q[0].fd);
cnx->q[0].fd = in_socket;
}
/* Connect the target socket */
out_socket = connect_addr(cnx, in_socket, BLOCKING);
CHECK_RES_DIE(out_socket, "connect");
cnx->q[1].fd = out_socket;
log_connection(NULL, cnx);
shovel_single(cnx);
close(in_socket);
close(out_socket);
print_message(msg_fd, "connection closed down\n");
exit(0);
}
/* Removes cnx from probing list */
static void remove_probing_cnx(struct select_info* fd_info, struct connection* cnx)
{
gap_remove_ptr(fd_info->probing_list, cnx, fd_info->num_probing);
fd_info->num_probing--;
}
static void add_probing_cnx(struct select_info* fd_info, struct connection* cnx)
{
gap_set(fd_info->probing_list, fd_info->num_probing, cnx);
fd_info->num_probing++;
}
/* Process read activity on a socket in probe state
* IN/OUT cnx: connection data, updated if connected
* IN/OUT info: updated if connected
* */
static void probing_read_process(struct connection* cnx,
struct select_info* fd_info)
{
int res;
/* If timed out it's SSH, otherwise the client sent
* data so probe the protocol */
if ((cnx->probe_timeout < time(NULL))) {
cnx->proto = timeout_protocol();
print_message(msg_fd, "timed out, connect to %s\n", cnx->proto->name);
} else {
res = probe_client_protocol(cnx);
if (res == PROBE_AGAIN)
return;
}
remove_probing_cnx(fd_info, cnx);
cnx->state = ST_SHOVELING;
/* libwrap check if required for this protocol */
if (cnx->proto->service &&
check_access_rights(cnx->q[0].fd, cnx->proto->service)) {
tidy_connection(cnx, fd_info);
res = -1;
} else if (cnx->proto->fork) {
switch (fork()) {
case 0: /* child */
/* TODO: close all file descriptors except 2 */
/* free(cnx); */
connect_proxy(cnx);
exit(0);
case -1: print_message(msg_system_error, "fork failed: err %d: %s\n", errno, strerror(errno));
break;
default: /* parent */
break;
}
tidy_connection(cnx, fd_info);
res = -1;
} else {
res = connect_queue(cnx, fd_info);
}
if (res >= fd_info->max_fd)
fd_info->max_fd = res + 1;;
}
/* Returns the queue index that contains the specified file descriptor */
int active_queue(struct connection* cnx, int fd)
{
if (cnx->q[0].fd == fd) return 0;
if (cnx->q[1].fd == fd) return 1;
print_message(msg_int_error, "file descriptor %d not found in connection object\n", fd);
return -1;
}
/* Process a connection that is active in read */
static void tcp_read_process(struct select_info* fd_info,
int fd)
{
cnx_collection* collection = fd_info->collection;
struct connection* cnx = collection_get_cnx_from_fd(collection, fd);
/* Determine active queue (0 or 1): if fd is that of q[1], active_q = 1,
* otherwise it's 0 */
int active_q = active_queue(cnx, fd);
switch (cnx->state) {
case ST_PROBING:
if (active_q == 1) {
print_message(msg_int_error, "Activity on fd2 while probing, impossible\n");
dump_connection(cnx);
exit(1);
}
probing_read_process(cnx, fd_info);
break;
case ST_SHOVELING:
shovel(cnx, active_q, fd_info);
break;
default: /* illegal */
print_message(msg_int_error, "Illegal connection state %d\n", cnx->state);
dump_connection(cnx);
exit(1);
}
}
static void cnx_read_process(struct select_info* fd_info, int fd)
{
cnx_collection* collection = fd_info->collection;
struct connection* cnx = collection_get_cnx_from_fd(collection, fd);
switch (cnx->type) {
case SOCK_STREAM:
tcp_read_process(fd_info, fd);
break;
case SOCK_DGRAM:
udp_s2c_forward(cnx);
break;
default:
print_message(msg_int_error, "cnx_read_process: Illegal connection type %d\n", cnx->type);
dump_connection(cnx);
exit(1);
}
}
/* Process a connection that is active in write */
static void cnx_write_process(struct select_info* fd_info, int fd)
{
struct connection* cnx = collection_get_cnx_from_fd(fd_info->collection, fd);
int res;
int queue = active_queue(cnx, fd);
res = flush_deferred(&cnx->q[queue]);
if ((res == -1) && ((errno == EPIPE) || (errno == ECONNRESET))) {
if (cnx->state == ST_PROBING) remove_probing_cnx(fd_info, cnx);
tidy_connection(cnx, fd_info);
} else {
/* If no deferred data is left, stop monitoring the fd
* for write, and restart monitoring the other one for reads*/
if (!cnx->q[queue].deferred_data_size) {
FD_CLR(cnx->q[queue].fd, &fd_info->fds_w);
FD_SET(cnx->q[1-queue].fd, &fd_info->fds_r);
}
}
}
/* Process a connection that accepts a socket
* (For UDP, this means all traffic coming from remote clients)
* */
void cnx_accept_process(struct select_info* fd_info, struct listen_endpoint* listen_socket)
{
int fd = listen_socket->socketfd;
int type = listen_socket->type;
struct connection* cnx;
int new_fd;
switch (type) {
case SOCK_STREAM:
cnx = accept_new_connection(fd, fd_info->collection);
if (cnx) {
add_probing_cnx(fd_info, cnx);
new_fd = cnx->q[0].fd;
}
break;
case SOCK_DGRAM:
new_fd = udp_c2s_forward(fd, fd_info->collection, fd_info->max_fd);
print_message(msg_fd, "new_fd %d\n", new_fd);
if (new_fd == -1)
return;
break;
default:
print_message(msg_int_error, "Inconsistent cnx type: %d\n", type);
exit(1);
return;
}
FD_SET(new_fd, &fd_info->fds_r);
if (new_fd >= fd_info->max_fd)
fd_info->max_fd = new_fd + 1;
}
/* Check all connections to see if a UDP connections has timed out, then free
* it. At the same time, keep track of the closest, next timeout. Only do the
* search through connections if that timeout actually happened. If the
* connection that would have timed out has had activity, it doesn't matter: we
* go through connections to find the next timeout, which was needed anyway. */
static void udp_timeouts(struct select_info* fd_info)
{
time_t now = time(NULL);
if (now < fd_info->next_timeout) return;
time_t next_timeout = INT_MAX;
for (int i = 0; i < fd_info->max_fd; i++) {
/* if it's either in read or write set, there is a connection
* behind that file descriptor */
if (FD_ISSET(i, &fd_info->fds_r) || FD_ISSET(i, &fd_info->fds_w)) {
struct connection* cnx = collection_get_cnx_from_fd(fd_info->collection, i);
if (cnx) {
time_t timeout = udp_timeout(cnx);
if (!timeout) continue; /* Not a UDP connection */
if (cnx && (timeout <= now)) {
print_message(msg_fd, "timed out UDP %d\n", cnx->target_sock);
close(cnx->target_sock);
FD_CLR(i, &fd_info->fds_r);
FD_CLR(i, &fd_info->fds_w);
collection_remove_cnx(fd_info->collection, cnx);
} else {
if (timeout < next_timeout) next_timeout = timeout;
}
}
}
}
if (next_timeout != INT_MAX)
fd_info->next_timeout = next_timeout;
}
/* Main loop: the idea is as follow: /* Main loop: the idea is as follow:
* - fds_r and fds_w contain the file descriptors to monitor in read and write * - fds_r and fds_w contain the file descriptors to monitor in read and write
@ -502,43 +128,33 @@ static void udp_timeouts(struct select_info* fd_info)
*/ */
void main_loop(struct listen_endpoint listen_sockets[], int num_addr_listen) void main_loop(struct listen_endpoint listen_sockets[], int num_addr_listen)
{ {
struct select_info fd_info = {0}; struct loop_info fd_info = {0};
fd_set readfds, writefds; /* working read and write fd sets */ fd_set readfds, writefds; /* working read and write fd sets */
struct timeval tv; struct timeval tv;
int i, res; int i, res;
fd_info.num_probing = 0; fd_info.num_probing = 0;
FD_ZERO(&fd_info.fds_r);
FD_ZERO(&fd_info.fds_w);
fd_info.probing_list = gap_init(0); fd_info.probing_list = gap_init(0);
for (i = 0; i < num_addr_listen; i++) { watchers_init(&fd_info.watchers, listen_sockets, num_addr_listen);
FD_SET(listen_sockets[i].socketfd, &fd_info.fds_r);
set_nonblock(listen_sockets[i].socketfd);
}
fd_info.max_fd = listen_sockets[num_addr_listen-1].socketfd + 1;
fd_info.collection = collection_init(fd_info.max_fd); fd_info.collection = collection_init(fd_info.watchers->max_fd);
while (1) while (1)
{ {
memset(&tv, 0, sizeof(tv)); memset(&tv, 0, sizeof(tv));
tv.tv_sec = cfg.timeout; tv.tv_sec = cfg.timeout;
memcpy(&readfds, &fd_info.fds_r, sizeof(readfds)); memcpy(&readfds, &fd_info.watchers->fds_r, sizeof(readfds));
memcpy(&writefds, &fd_info.fds_w, sizeof(writefds)); memcpy(&writefds, &fd_info.watchers->fds_w, sizeof(writefds));
print_message(msg_fd, "selecting... max_fd=%d num_probing=%d\n", print_message(msg_fd, "selecting... max_fd=%d num_probing=%d\n",
fd_info.max_fd, fd_info.num_probing); fd_info.watchers->max_fd, fd_info.num_probing);
res = select(fd_info.max_fd, &readfds, &writefds, res = select(fd_info.watchers->max_fd, &readfds, &writefds,
NULL, fd_info.num_probing ? &tv : NULL); NULL, fd_info.num_probing ? &tv : NULL);
if (res < 0) if (res < 0)
perror("select"); perror("select");
/* UDP timeouts: clear out connections after some idle time */
udp_timeouts(&fd_info);
/* Check main socket for new connections */ /* Check main socket for new connections */
for (i = 0; i < num_addr_listen; i++) { for (i = 0; i < num_addr_listen; i++) {
if (FD_ISSET(listen_sockets[i].socketfd, &readfds)) { if (FD_ISSET(listen_sockets[i].socketfd, &readfds)) {
@ -550,7 +166,7 @@ void main_loop(struct listen_endpoint listen_sockets[], int num_addr_listen)
} }
/* Check all sockets for write activity */ /* Check all sockets for write activity */
for (i = 0; i < fd_info.max_fd; i++) { for (i = 0; i < fd_info.watchers->max_fd; i++) {
if (FD_ISSET(i, &writefds)) { if (FD_ISSET(i, &writefds)) {
cnx_write_process(&fd_info, i); cnx_write_process(&fd_info, i);
} }
@ -572,11 +188,11 @@ void main_loop(struct listen_endpoint listen_sockets[], int num_addr_listen)
} }
/* Check all sockets for read activity */ /* Check all sockets for read activity */
for (i = 0; i < fd_info.max_fd; i++) { for (i = 0; i < fd_info.watchers->max_fd; i++) {
/* Check if it's active AND currently monitored (if a connection /* Check if it's active AND currently monitored (if a connection
* died, it gets tidied, which closes both sockets, but readfs does * died, it gets tidied, which closes both sockets, but readfs does
* not know about that */ * not know about that */
if (FD_ISSET(i, &readfds) && FD_ISSET(i, &fd_info.fds_r)) { if (FD_ISSET(i, &readfds) && FD_ISSET(i, &fd_info.watchers->fds_r)) {
cnx_read_process(&fd_info, i); cnx_read_process(&fd_info, i);
} }
} }

View File

@ -20,12 +20,67 @@
*/ */
#include <limits.h>
#include "common.h" #include "common.h"
#include "probe.h" #include "probe.h"
#include "sslh-conf.h" #include "sslh-conf.h"
#include "udp-listener.h" #include "udp-listener.h"
/* returns date at which this socket times out. */
static int udp_timeout(struct connection* cnx)
{
if (cnx->type != SOCK_DGRAM) return 0; /* Not a UDP connection */
return cnx->proto->udp_timeout + cnx->last_active;
}
/* Check all connections to see if a UDP connections has timed out, then free
* it. At the same time, keep track of the closest, next timeout. Only do the
* search through connections if that timeout actually happened. If the
* connection that would have timed out has had activity, it doesn't matter: we
* go through connections to find the next timeout, which was needed anyway.
*
* This gets called every time a UDP packet is received from the outside, i.e.
* every time we might need to free up resources. If no packets come in, we
* don't time out anything, as we don't need the resources.
*
* TODO: use a better algorithm to avoid going through all connections each
* time.
*
* */
void udp_timeouts(struct loop_info* fd_info)
{
time_t now = time(NULL);
if (now < fd_info->next_timeout) return;
time_t next_timeout = INT_MAX;
for (int i = 0; i < watchers_maxfd(fd_info->watchers); i++) {
/* if it's either in read or write set, there is a connection
* behind that file descriptor */
struct connection* cnx = collection_get_cnx_from_fd(fd_info->collection, i);
if (cnx) {
time_t timeout = udp_timeout(cnx);
if (!timeout) continue; /* Not a UDP connection */
if (cnx && (timeout <= now)) {
print_message(msg_fd, "timed out UDP %d\n", cnx->target_sock);
close(cnx->target_sock);
watchers_del_read(fd_info->watchers, i);
watchers_del_write(fd_info->watchers, i);
collection_remove_cnx(fd_info->collection, cnx);
} else {
if (timeout < next_timeout) next_timeout = timeout;
}
}
}
if (next_timeout != INT_MAX)
fd_info->next_timeout = next_timeout;
}
/* Find if the specified source has been seen before. -1 if not found /* Find if the specified source has been seen before. -1 if not found
* *
* TODO This is linear search and needs to be changed to something better for * TODO This is linear search and needs to be changed to something better for
@ -53,12 +108,14 @@ static int known_source(cnx_collection* collection, int max_fd, struct sockaddr*
* Returns: >= 0 sockfd of newly allocated socket, for new connections * Returns: >= 0 sockfd of newly allocated socket, for new connections
* -1 otherwise * -1 otherwise
* */ * */
int udp_c2s_forward(int sockfd, cnx_collection* collection, int max_fd) int udp_c2s_forward(int sockfd, struct loop_info* fd_info)
{ {
char addr_str[NI_MAXHOST+1+NI_MAXSERV+1]; char addr_str[NI_MAXHOST+1+NI_MAXSERV+1];
struct sockaddr src_addr; struct sockaddr src_addr;
struct addrinfo addrinfo; struct addrinfo addrinfo;
struct sslhcfg_protocols_item* proto; struct sslhcfg_protocols_item* proto;
cnx_collection* collection = fd_info->collection;
int max_fd = watchers_maxfd(fd_info->watchers);
struct connection* cnx; struct connection* cnx;
ssize_t len; ssize_t len;
socklen_t addrlen; socklen_t addrlen;
@ -67,6 +124,8 @@ int udp_c2s_forward(int sockfd, cnx_collection* collection, int max_fd)
This will do. Dynamic allocation is possible with the MSG_PEEK flag in recvfrom(2), but that'd imply This will do. Dynamic allocation is possible with the MSG_PEEK flag in recvfrom(2), but that'd imply
malloc/free overhead for each packet, when really 64K is not that much */ malloc/free overhead for each packet, when really 64K is not that much */
udp_timeouts(fd_info);
addrlen = sizeof(src_addr); addrlen = sizeof(src_addr);
len = recvfrom(sockfd, data, sizeof(data), 0, &src_addr, &addrlen); len = recvfrom(sockfd, data, sizeof(data), 0, &src_addr, &addrlen);
if (len < 0) { if (len < 0) {
@ -90,6 +149,8 @@ int udp_c2s_forward(int sockfd, cnx_collection* collection, int max_fd)
} }
out = socket(proto->saddr->ai_family, SOCK_DGRAM, 0); out = socket(proto->saddr->ai_family, SOCK_DGRAM, 0);
res = set_nonblock(out);
CHECK_RES_RETURN(res, "udp:socket:nonblock", -1);
struct connection* cnx = collection_alloc_cnx_from_fd(collection, out); struct connection* cnx = collection_alloc_cnx_from_fd(collection, out);
if (!cnx) return -1; if (!cnx) return -1;
target = out; target = out;
@ -119,18 +180,10 @@ void udp_s2c_forward(struct connection* cnx)
int res; int res;
res = recvfrom(sockfd, data, sizeof(data), 0, NULL, NULL); res = recvfrom(sockfd, data, sizeof(data), 0, NULL, NULL);
if ((res == -1) && ((errno == EAGAIN) || (errno == EWOULDBLOCK))) return;
CHECK_RES_DIE(res, "udp_listener/recvfrom"); CHECK_RES_DIE(res, "udp_listener/recvfrom");
res = sendto(cnx->local_endpoint, data, res, 0, res = sendto(cnx->local_endpoint, data, res, 0,
&cnx->client_addr, cnx->addrlen); &cnx->client_addr, cnx->addrlen);
cnx->last_active = time(NULL); cnx->last_active = time(NULL);
} }
/* returns date at which this socket times out. */
int udp_timeout(struct connection* cnx)
{
if (cnx->type != SOCK_DGRAM) return 0; /* Not a UDP connection */
return cnx->proto->udp_timeout + cnx->last_active;
}

View File

@ -2,6 +2,8 @@
#define UDPLISTENER_H #define UDPLISTENER_H
#include "collection.h" #include "collection.h"
#include "processes.h"
#include "common.h"
/* UDP listener: upon incoming packet, find where it should go /* UDP listener: upon incoming packet, find where it should go
* This is run in its own process and never returns. * This is run in its own process and never returns.
@ -14,15 +16,9 @@ void udp_listener(struct listen_endpoint* endpoint, int num_endpoints, int activ
* Returns: >= 0 sockfd of newly allocated socket, for new connections * Returns: >= 0 sockfd of newly allocated socket, for new connections
* -1 otherwise * -1 otherwise
* */ * */
int udp_c2s_forward(int sockfd, cnx_collection* collection, int max_fd); int udp_c2s_forward(int sockfd, struct loop_info* fd_info);
/* Process UDP coming from inside (server towards client) */ /* Process UDP coming from inside (server towards client) */
void udp_s2c_forward(struct connection* cnx); void udp_s2c_forward(struct connection* cnx);
/* returns how many seconds before socket times out. Negative if timed out
* already.
*/
int udp_timeout(struct connection* cnx);
#endif /* UDPLISTENER_H */ #endif /* UDPLISTENER_H */