diff --git a/src/eloop.c b/src/eloop.c index 60e8a9a..4335c69 100644 --- a/src/eloop.c +++ b/src/eloop.c @@ -38,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -100,6 +101,15 @@ struct ev_timer { void *data; }; +struct ev_counter { + unsigned long ref; + ev_counter_cb cb; + void *data; + + int fd; + struct ev_fd *efd; +}; + int ev_eloop_new_eloop(struct ev_eloop *loop, struct ev_eloop **out) { struct ev_eloop *el; @@ -744,6 +754,191 @@ int ev_eloop_update_timer(struct ev_timer *timer, return 0; } +/* + * Counter Sources + * Counter sources are a very basic event notification mechanism. It is based + * around the eventfd() system call on linux machines. Internally, there is a + * 64bit unsigned integer that can be increased by the caller. By default it is + * set to 0. If it is non-zero, the event-fd will be notified and the + * user-defined callback is called. The callback gets as argument the current + * state of the counter and the counter is reset to 0. + * + * If the internal counter would overflow, an increase() fails silently so an + * overflow will never occur, however, you may loose events this way. This can + * be ignored when increasing with small values, only. + */ + +int ev_counter_new(struct ev_counter **out, ev_counter_cb cb, void *data) +{ + struct ev_counter *cnt; + int ret; + + if (!out) + return -EINVAL; + + cnt = malloc(sizeof(*cnt)); + if (!cnt) + return -ENOMEM; + memset(cnt, 0, sizeof(*cnt)); + cnt->ref = 1; + cnt->cb = cb; + cnt->data = data; + + cnt->fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + if (cnt->fd < 0) { + log_error("cannot create eventfd (%d): %m", errno); + ret = -EFAULT; + goto err_free; + } + + ret = ev_fd_new(&cnt->efd); + if (ret) + goto err_close; + + *out = cnt; + return 0; + +err_close: + close(cnt->fd); +err_free: + free(cnt); + return ret; +} + +void ev_counter_ref(struct ev_counter *cnt) +{ + if (!cnt || !cnt->ref) + return; + + ++cnt->ref; +} + +void ev_counter_unref(struct ev_counter *cnt) +{ + if (!cnt || !cnt->ref || --cnt->ref) + return; + + ev_fd_unref(cnt->efd); + close(cnt->fd); + free(cnt); +} + +bool ev_counter_is_bound(struct ev_counter *cnt) +{ + return cnt && ev_fd_is_bound(cnt->efd); +} + +void ev_counter_set_cb_data(struct ev_counter *cnt, ev_counter_cb cb, + void *data) +{ + if (!cnt) + return; + + cnt->cb = cb; + cnt->data = data; +} + +void ev_counter_inc(struct ev_counter *cnt, uint64_t val) +{ + int ret; + + if (!cnt || !val) + return; + + if (val == 0xffffffffffffffffULL) { + log_warning("increasing counter with invalid value %llu", val); + return; + } + + ret = write(cnt->fd, &val, sizeof(val)); + if (ret < 0) { + if (errno == EAGAIN) + log_warning("eventfd overflow while writing %llu", val); + else + log_warning("eventfd write error (%d): %m", errno); + } else if (ret != sizeof(val)) { + log_warning("wrote %d bytes instead of 8 to eventdfd", ret); + } +} + +int ev_eloop_new_counter(struct ev_eloop *eloop, struct ev_counter **out, + ev_counter_cb cb, void *data) +{ + int ret; + struct ev_counter *cnt; + + if (!eloop || !out) + return -EINVAL; + + ret = ev_counter_new(&cnt, cb, data); + if (ret) + return ret; + + ret = ev_eloop_add_counter(eloop, cnt); + if (ret) { + ev_counter_unref(cnt); + return ret; + } + + ev_counter_unref(cnt); + return 0; +} + +static void counter_event(struct ev_fd *fd, int mask, void *data) +{ + struct ev_counter *cnt = data; + int ret; + uint64_t val; + + if (mask & (EV_HUP | EV_ERR)) { + log_warning("HUP/ERR on eventfd"); + return; + } + + if (!(mask & EV_READABLE)) + return; + + ret = read(cnt->fd, &val, sizeof(val)); + if (ret < 0) { + if (errno != EAGAIN) + log_warning("reading eventfd failed (%d): %m", errno); + } else if (ret == 0) { + log_warning("EOF on eventfd"); + } else if (ret != sizeof(val)) { + log_warning("read %d bytes instead of 8 on eventfd", ret); + } else if (cnt->cb) { + cnt->cb(cnt, val, cnt->data); + } +} + +int ev_eloop_add_counter(struct ev_eloop *eloop, struct ev_counter *cnt) +{ + int ret; + + if (!eloop || !cnt) + return -EINVAL; + + if (ev_fd_is_bound(cnt->efd)) + return -EALREADY; + + ret = ev_eloop_add_fd(eloop, cnt->efd, cnt->fd, EV_READABLE, + counter_event, cnt); + if (ret) + return ret; + + ev_counter_ref(cnt); + return 0; +} + +void ev_eloop_rm_counter(struct ev_counter *cnt) +{ + if (!cnt || !ev_fd_is_bound(cnt->efd)) + return; + + ev_eloop_rm_fd(cnt->efd); + ev_counter_unref(cnt); +} + int ev_eloop_new(struct ev_eloop **out) { struct ev_eloop *loop; diff --git a/src/eloop.h b/src/eloop.h index 95f7410..ed60b7f 100644 --- a/src/eloop.h +++ b/src/eloop.h @@ -44,6 +44,7 @@ struct ev_eloop; struct ev_idle; struct ev_fd; struct ev_timer; +struct ev_counter; typedef void (*ev_idle_cb) (struct ev_idle *idle, void *data); typedef void (*ev_fd_cb) (struct ev_fd *fd, int mask, void *data); @@ -51,6 +52,8 @@ typedef void (*ev_signal_shared_cb) (struct ev_eloop *eloop, struct signalfd_siginfo *info, void *data); typedef void (*ev_timer_cb) (struct ev_timer *timer, uint64_t num, void *data); +typedef void (*ev_counter_cb) + (struct ev_counter *cnt, uint64_t num, void *data); enum ev_eloop_flags { EV_READABLE = 0x01, @@ -123,4 +126,20 @@ void ev_eloop_rm_timer(struct ev_timer *timer); int ev_eloop_update_timer(struct ev_timer *timer, const struct itimerspec *spec); +/* counter sources */ + +int ev_counter_new(struct ev_counter **out, ev_counter_cb, void *data); +void ev_counter_ref(struct ev_counter *cnt); +void ev_counter_unref(struct ev_counter *cnt); + +bool ev_counter_is_bound(struct ev_counter *cnt); +void ev_counter_set_cb_data(struct ev_counter *cnt, ev_counter_cb cb, + void *data); +void ev_counter_inc(struct ev_counter *cnt, uint64_t val); + +int ev_eloop_new_counter(struct ev_eloop *eloop, struct ev_counter **out, + ev_counter_cb cb, void *data); +int ev_eloop_add_counter(struct ev_eloop *eloop, struct ev_counter *cnt); +void ev_eloop_rm_counter(struct ev_counter *cnt); + #endif /* EV_ELOOP_H */