Simple coroutine library integrated with IO event loop (libevent) / objects in plain C Snapshot
|
#include <evthread.h>
#include <errno.h>
#include <unistd.h>
#include <sys/time.h>
#include <nutils/ioutils.h>
#include <butils/logg.h>
Go to the source code of this file.
Defines | |
#define | EVENT_ID_HAS_IO_EVENT 0 |
#define | EVENT_ID_HAS_IO_ERROR 1 |
#define | TIMER_ID_COMM_TIMEOUT 2 |
#define | TIMER_ID_IDLE_TIMEOUT 3 |
Functions | |
EVLOOP * | EVLOOP_init (STACKS *stacks) |
int | EVLOOP_run (EVLOOP *loop) |
int | EVLOOP_break (EVLOOP *loop) |
int | EVTHREAD_free (EVTHREAD *thread) |
static void | evthread_proc (VALUES *ctx) |
static void | thread_timer_cb (int fd, short event, void *ctx) |
EVTHREAD * | EVTHREAD_init (EVLOOP *loop, EVTHREAD_PROC thread_proc, void *user_ctx) |
int | EVTHREAD_start (EVTHREAD *thread, struct tagEVSOCKET *socket) |
int | EVTHREAD_delay (EVTHREAD *thread, struct timeval delay) |
void | EVTHREAD_OBJECT_init (EVTHREAD_OBJECT *obj, int type, EVTHREAD *owner) |
void | EVTHREAD_OBJECT_free (EVTHREAD_OBJECT *obj) |
static void | timer_cb (int fd, short event, void *ctx) |
EVTIMER * | EVTIMER_init (EVTHREAD *thread, int timer_id, struct timeval tm) |
int | EVTIMER_start (EVTIMER *ret) |
int | EVTIMER_cancel (EVTIMER *timer) |
int | EVTIMER_free (EVTIMER *timer) |
static void | socket_cb (int fd, short event, void *ctx) |
EVSOCKET * | EVSOCKET_init (EVTHREAD *thread, int fd, int is_connected) |
int | EVSOCKET_close (EVSOCKET *socket) |
void | EVSOCKET_set_idle_timeout (EVSOCKET *socket, struct timeval timeout) |
int | EVSOCKET_connect (EVSOCKET *socket, struct sockaddr *address, socklen_t socklen, struct timeval timeout) |
static int | EVSOCKET_recv_internal (EVSOCKET *socket, void *buf, size_t buf_size, int flags, struct timeval timeout) |
static int | EVSOCKET_send_internal (EVSOCKET *socket, void *buf, size_t buf_size, int flags, struct timeval timeout) |
int | EVSOCKET_recv (EVSOCKET *socket, void *buf, size_t buf_size, int flags, struct timeval timeout) |
int | EVSOCKET_recv_all (EVSOCKET *socket, void *buf, size_t buf_size, int flags, struct timeval timeout) |
int | EVSOCKET_send (EVSOCKET *socket, void *buf, size_t buf_size, int flags, struct timeval timeout) |
static void | socket_listener_cb (int fd, short event, void *ctx) |
EVTCPACCEPTOR * | EVTCPACCEPTOR_init_ex (EVLOOP *loop, SOCKADDR *addr, int listener_backlog, EVTHREAD_FACTORY factory, int read_buffer_size, int send_buffer_size, void *ctx) |
EVTCPACCEPTOR * | EVTCPACCEPTOR_init (EVLOOP *loop, int fd, EVTHREAD_FACTORY factory, int read_buffer_size, int send_buffer_size, void *ctx) |
void | EVTCPACCEPTOR_close (EVTCPACCEPTOR *acceptor) |
void | custom_timeout_handling (struct event_base *base, struct timeval *tv) |
#define EVENT_ID_HAS_IO_ERROR 1 |
Definition at line 13 of file evthread.c.
#define EVENT_ID_HAS_IO_EVENT 0 |
Definition at line 12 of file evthread.c.
#define TIMER_ID_COMM_TIMEOUT 2 |
Definition at line 14 of file evthread.c.
#define TIMER_ID_IDLE_TIMEOUT 3 |
Definition at line 15 of file evthread.c.
void custom_timeout_handling | ( | struct event_base * | base, |
struct timeval * | tv | ||
) |
Definition at line 731 of file evthread.c.
{ M_UNUSED(base); M_UNUSED(tv); }
static int EVSOCKET_recv_internal | ( | EVSOCKET * | socket, |
void * | buf, | ||
size_t | buf_size, | ||
int | flags, | ||
struct timeval | timeout | ||
) | [static] |
Definition at line 414 of file evthread.c.
{ int rt; int has_event = 0; VALUES *rvalues; int event_id; if ( socket->state != EVSOCKET_STATE_CONNECTED) { return -1; } r_again: do { rt = recv( socket->fd, buf, buf_size, flags); } while(rt == -1 && errno == EINTR); if (rt == -1) { if (errno == EAGAIN) { MLOG_TRACE( "socket %d read has blocked", socket->fd ); socket->state = EVSOCKET_STATE_READING; socket->timer_io_timeout = EVTIMER_init( socket->thread, TIMER_ID_COMM_TIMEOUT, timeout ); if (socket->timer_io_timeout) { EVTIMER_start(socket->timer_io_timeout); } if (!has_event) { event_add( &socket->read_event, 0 ); has_event = 1; } CTHREAD_yield( &rvalues, 0 ); VALUES_scan( rvalues, "%d", &event_id ); if (socket->timer_io_timeout) { EVTIMER_free(socket->timer_io_timeout); socket->timer_io_timeout = 0; } switch(event_id) { case EVENT_ID_HAS_IO_EVENT: MLOG_TRACE( "socket %d received read event", socket->fd ); goto r_again; default: MLOG_DEBUG( "socket %d read timed out", socket->fd ); socket->state = EVSOCKET_STATE_ERROR; close( socket->fd ); socket->fd = -1; rt = -1; break; } } else { MLOG_DEBUG( "socket %d read error. errno %d", socket->fd, errno ); } return -1; } if (has_event) { event_del( &socket->read_event ); } if (rt != -1) { socket->state = EVSOCKET_STATE_CONNECTED; } if (rt >= 0) { if (socket->timer_idle_timeout) { EVTIMER_cancel(socket->timer_idle_timeout); } } return rt; }
static int EVSOCKET_send_internal | ( | EVSOCKET * | socket, |
void * | buf, | ||
size_t | buf_size, | ||
int | flags, | ||
struct timeval | timeout | ||
) | [static] |
Definition at line 491 of file evthread.c.
{ int rt; int has_event = 0; VALUES *rvalues; int event_id; if ( socket->state != EVSOCKET_STATE_CONNECTED) { return -1; } w_again: do { rt = send( socket->fd, buf, buf_size, flags); } while(rt == -1 && errno == EINTR); if (rt == -1) { if (errno == EAGAIN) { socket->state = EVSOCKET_STATE_WRITING; socket->timer_io_timeout = EVTIMER_init( socket->thread, TIMER_ID_COMM_TIMEOUT, timeout ); if (socket->timer_io_timeout) { EVTIMER_start(socket->timer_io_timeout); } if (!has_event) { event_add( &socket->write_event, 0 ); has_event = 1; } CTHREAD_yield( &rvalues, 0 ); VALUES_scan( rvalues, "%d", &event_id ); if (socket->timer_io_timeout) { EVTIMER_free(socket->timer_io_timeout); socket->timer_io_timeout = 0; } switch(event_id) { case EVENT_ID_HAS_IO_EVENT: MLOG_TRACE( "socket %d received write event", socket->fd ); goto w_again; default: MLOG_DEBUG( "socket %d write timed out", socket->fd ); socket->state = EVSOCKET_STATE_ERROR; close( socket->fd ); socket->fd = -1; rt = -1; break; } } } if (has_event) { event_del( &socket->write_event ); } if (rt != -1) { socket->state = EVSOCKET_STATE_CONNECTED; } return rt; }
int EVTHREAD_free | ( | EVTHREAD * | thread | ) |
Definition at line 54 of file evthread.c.
{ DLIST_entry *entry, *next; EVTHREAD_OBJECT *obj; DLIST_FOREACH_SAVE( entry, next, &thread->object_list ) { obj = (EVTHREAD_OBJECT *) entry; switch( obj->object_type ) { case EVTHREAD_OBJECT_SOCKET: EVSOCKET_close( (EVSOCKET *) obj ); break; case EVTHREAD_OBJECT_TIMER: EVTIMER_free( (EVTIMER *) obj ); break; } } CTHREAD_free(thread->cthread); free(thread); return 0; }
void EVTHREAD_OBJECT_free | ( | EVTHREAD_OBJECT * | obj | ) |
Definition at line 154 of file evthread.c.
{ DLIST_unlink( &obj->owner->object_list, &obj->entry ); free(obj); }
void EVTHREAD_OBJECT_init | ( | EVTHREAD_OBJECT * | obj, |
int | type, | ||
EVTHREAD * | owner | ||
) |
Definition at line 147 of file evthread.c.
{ DLIST_push_back( &owner->object_list, &obj->entry ); obj->object_type = type; obj->owner = owner; }
static void evthread_proc | ( | VALUES * | ctx | ) | [static] |
Definition at line 124 of file evthread.c.
{ EVTHREAD *thread; VALUES_scan( values, "%p", &thread ); thread->thread_proc( thread, thread->socket, thread->user_context ); // no way to get exit status of the EVTHREAD object. everything is hereby closed. EVTHREAD_free(thread); }
static void socket_cb | ( | int | fd, |
short | event, | ||
void * | ctx | ||
) | [static] |
Definition at line 306 of file evthread.c.
{ EVSOCKET *socket; M_UNUSED( fd ); socket = (EVSOCKET *) ctx; if (event & EV_READ && event & EV_WRITE) { socket->state = EVSOCKET_STATE_ERROR; CTHREAD_resume( socket->thread->cthread, 0, "%d", EVENT_ID_HAS_IO_ERROR ); return; } if (event & EV_READ) { if (socket->state == EVSOCKET_STATE_READING) { MLOG_TRACE( "socket %d read event occured", fd ); CTHREAD_resume( socket->thread->cthread, 0, "%d", EVENT_ID_HAS_IO_EVENT); } else { MLOG_INFO( "socket %d read event ignored !!!", fd ); } } if (event & EV_WRITE) { if (socket->state == EVSOCKET_STATE_CONNECTING || socket->state == EVSOCKET_STATE_WRITING) { MLOG_TRACE( "socket %d write event occured", fd ); CTHREAD_resume( socket->thread->cthread, 0, "%d", EVENT_ID_HAS_IO_EVENT); } else { MLOG_INFO( "socket %d write event ignored !!!", fd ); } } }
static void socket_listener_cb | ( | int | fd, |
short | event, | ||
void * | ctx | ||
) | [static] |
Definition at line 670 of file evthread.c.
{ EVTCPACCEPTOR *acceptor; EVTHREAD_PROC thread_proc; void *thread_ctx; EVTHREAD *thread; EVSOCKET *socket; int sock; M_UNUSED(fd); if (event & EV_READ && event & EV_WRITE) { return; } acceptor = (EVTCPACCEPTOR *) ctx; do { sock = accept( acceptor->fd, 0, 0 ); } while( sock == -1 && errno == EINTR); if (sock == -1) { return; } MLOG_TRACE( "socket %d accepted", sock ); if (acceptor->read_buffer_size != -1) { fd_set_buf_size( sock, Receive_buffer, acceptor->read_buffer_size ); } if (acceptor->send_buffer_size != -1) { fd_set_buf_size( sock, Send_buffer, acceptor->send_buffer_size); } // get thread procedure and thread argument data. if ( acceptor->factory( sock, &thread_proc, &thread_ctx, acceptor->ctx ) ) { close(sock); return; } // create the user thread. thread =EVTHREAD_init( acceptor->loop, thread_proc, thread_ctx); if (!thread) { close(sock); return; } socket = EVSOCKET_init(thread, sock, 1); if (!socket) { //close thread) close(sock); return; } EVTHREAD_start( thread, socket ); }
static void thread_timer_cb | ( | int | fd, |
short | event, | ||
void * | ctx | ||
) | [static] |
Definition at line 135 of file evthread.c.
static void timer_cb | ( | int | fd, |
short | event, | ||
void * | ctx | ||
) | [static] |
Definition at line 223 of file evthread.c.
{ EVTIMER *timer; M_UNUSED(fd); M_UNUSED(event); timer = (EVTIMER *) ctx; event_del( &timer->timer_event ); timer->state = EVTIMER_STATE_INIT; MLOG_TRACE( "Timer %p event occured", &timer->timer_event ); CTHREAD_resume( timer->object_base.owner->cthread , 0, "%d", timer->timer_id ); }