|
Simple coroutine library integrated with IO event loop (libevent) / objects in plain C Snapshot
|
#include <evthread.h>#include <errno.h>#include <unistd.h>#include <sys/time.h>#include <nutils/ioutils.h>#include <butils/logg.h>Go to the source code of this file.
Defines | |
| #define | EVENT_ID_HAS_IO_EVENT 0 |
| #define | EVENT_ID_HAS_IO_ERROR 1 |
| #define | TIMER_ID_COMM_TIMEOUT 2 |
| #define | TIMER_ID_IDLE_TIMEOUT 3 |
Functions | |
| EVLOOP * | EVLOOP_init (STACKS *stacks) |
| int | EVLOOP_run (EVLOOP *loop) |
| int | EVLOOP_break (EVLOOP *loop) |
| int | EVTHREAD_free (EVTHREAD *thread) |
| static void | evthread_proc (VALUES *ctx) |
| static void | thread_timer_cb (int fd, short event, void *ctx) |
| EVTHREAD * | EVTHREAD_init (EVLOOP *loop, EVTHREAD_PROC thread_proc, void *user_ctx) |
| int | EVTHREAD_start (EVTHREAD *thread, struct tagEVSOCKET *socket) |
| int | EVTHREAD_delay (EVTHREAD *thread, struct timeval delay) |
| void | EVTHREAD_OBJECT_init (EVTHREAD_OBJECT *obj, int type, EVTHREAD *owner) |
| void | EVTHREAD_OBJECT_free (EVTHREAD_OBJECT *obj) |
| static void | timer_cb (int fd, short event, void *ctx) |
| EVTIMER * | EVTIMER_init (EVTHREAD *thread, int timer_id, struct timeval tm) |
| int | EVTIMER_start (EVTIMER *ret) |
| int | EVTIMER_cancel (EVTIMER *timer) |
| int | EVTIMER_free (EVTIMER *timer) |
| static void | socket_cb (int fd, short event, void *ctx) |
| EVSOCKET * | EVSOCKET_init (EVTHREAD *thread, int fd, int is_connected) |
| int | EVSOCKET_close (EVSOCKET *socket) |
| void | EVSOCKET_set_idle_timeout (EVSOCKET *socket, struct timeval timeout) |
| int | EVSOCKET_connect (EVSOCKET *socket, struct sockaddr *address, socklen_t socklen, struct timeval timeout) |
| static int | EVSOCKET_recv_internal (EVSOCKET *socket, void *buf, size_t buf_size, int flags, struct timeval timeout) |
| static int | EVSOCKET_send_internal (EVSOCKET *socket, void *buf, size_t buf_size, int flags, struct timeval timeout) |
| int | EVSOCKET_recv (EVSOCKET *socket, void *buf, size_t buf_size, int flags, struct timeval timeout) |
| int | EVSOCKET_recv_all (EVSOCKET *socket, void *buf, size_t buf_size, int flags, struct timeval timeout) |
| int | EVSOCKET_send (EVSOCKET *socket, void *buf, size_t buf_size, int flags, struct timeval timeout) |
| static void | socket_listener_cb (int fd, short event, void *ctx) |
| EVTCPACCEPTOR * | EVTCPACCEPTOR_init_ex (EVLOOP *loop, SOCKADDR *addr, int listener_backlog, EVTHREAD_FACTORY factory, int read_buffer_size, int send_buffer_size, void *ctx) |
| EVTCPACCEPTOR * | EVTCPACCEPTOR_init (EVLOOP *loop, int fd, EVTHREAD_FACTORY factory, int read_buffer_size, int send_buffer_size, void *ctx) |
| void | EVTCPACCEPTOR_close (EVTCPACCEPTOR *acceptor) |
| void | custom_timeout_handling (struct event_base *base, struct timeval *tv) |
| #define EVENT_ID_HAS_IO_ERROR 1 |
Definition at line 13 of file evthread.c.
| #define EVENT_ID_HAS_IO_EVENT 0 |
Definition at line 12 of file evthread.c.
| #define TIMER_ID_COMM_TIMEOUT 2 |
Definition at line 14 of file evthread.c.
| #define TIMER_ID_IDLE_TIMEOUT 3 |
Definition at line 15 of file evthread.c.
| void custom_timeout_handling | ( | struct event_base * | base, |
| struct timeval * | tv | ||
| ) |
Definition at line 731 of file evthread.c.
{
M_UNUSED(base);
M_UNUSED(tv);
}
| static int EVSOCKET_recv_internal | ( | EVSOCKET * | socket, |
| void * | buf, | ||
| size_t | buf_size, | ||
| int | flags, | ||
| struct timeval | timeout | ||
| ) | [static] |
Definition at line 414 of file evthread.c.
{
int rt;
int has_event = 0;
VALUES *rvalues;
int event_id;
if ( socket->state != EVSOCKET_STATE_CONNECTED) {
return -1;
}
r_again:
do {
rt = recv( socket->fd, buf, buf_size, flags);
} while(rt == -1 && errno == EINTR);
if (rt == -1) {
if (errno == EAGAIN) {
MLOG_TRACE( "socket %d read has blocked", socket->fd );
socket->state = EVSOCKET_STATE_READING;
socket->timer_io_timeout = EVTIMER_init( socket->thread, TIMER_ID_COMM_TIMEOUT, timeout );
if (socket->timer_io_timeout) {
EVTIMER_start(socket->timer_io_timeout);
}
if (!has_event) {
event_add( &socket->read_event, 0 );
has_event = 1;
}
CTHREAD_yield( &rvalues, 0 );
VALUES_scan( rvalues, "%d", &event_id );
if (socket->timer_io_timeout) {
EVTIMER_free(socket->timer_io_timeout);
socket->timer_io_timeout = 0;
}
switch(event_id) {
case EVENT_ID_HAS_IO_EVENT:
MLOG_TRACE( "socket %d received read event", socket->fd );
goto r_again;
default:
MLOG_DEBUG( "socket %d read timed out", socket->fd );
socket->state = EVSOCKET_STATE_ERROR;
close( socket->fd );
socket->fd = -1;
rt = -1;
break;
}
} else {
MLOG_DEBUG( "socket %d read error. errno %d", socket->fd, errno );
}
return -1;
}
if (has_event) {
event_del( &socket->read_event );
}
if (rt != -1) {
socket->state = EVSOCKET_STATE_CONNECTED;
}
if (rt >= 0) {
if (socket->timer_idle_timeout) {
EVTIMER_cancel(socket->timer_idle_timeout);
}
}
return rt;
}
| static int EVSOCKET_send_internal | ( | EVSOCKET * | socket, |
| void * | buf, | ||
| size_t | buf_size, | ||
| int | flags, | ||
| struct timeval | timeout | ||
| ) | [static] |
Definition at line 491 of file evthread.c.
{
int rt;
int has_event = 0;
VALUES *rvalues;
int event_id;
if ( socket->state != EVSOCKET_STATE_CONNECTED) {
return -1;
}
w_again:
do {
rt = send( socket->fd, buf, buf_size, flags);
} while(rt == -1 && errno == EINTR);
if (rt == -1) {
if (errno == EAGAIN) {
socket->state = EVSOCKET_STATE_WRITING;
socket->timer_io_timeout = EVTIMER_init( socket->thread, TIMER_ID_COMM_TIMEOUT, timeout );
if (socket->timer_io_timeout) {
EVTIMER_start(socket->timer_io_timeout);
}
if (!has_event) {
event_add( &socket->write_event, 0 );
has_event = 1;
}
CTHREAD_yield( &rvalues, 0 );
VALUES_scan( rvalues, "%d", &event_id );
if (socket->timer_io_timeout) {
EVTIMER_free(socket->timer_io_timeout);
socket->timer_io_timeout = 0;
}
switch(event_id) {
case EVENT_ID_HAS_IO_EVENT:
MLOG_TRACE( "socket %d received write event", socket->fd );
goto w_again;
default:
MLOG_DEBUG( "socket %d write timed out", socket->fd );
socket->state = EVSOCKET_STATE_ERROR;
close( socket->fd );
socket->fd = -1;
rt = -1;
break;
}
}
}
if (has_event) {
event_del( &socket->write_event );
}
if (rt != -1) {
socket->state = EVSOCKET_STATE_CONNECTED;
}
return rt;
}
| int EVTHREAD_free | ( | EVTHREAD * | thread | ) |
Definition at line 54 of file evthread.c.
{
DLIST_entry *entry, *next;
EVTHREAD_OBJECT *obj;
DLIST_FOREACH_SAVE( entry, next, &thread->object_list ) {
obj = (EVTHREAD_OBJECT *) entry;
switch( obj->object_type ) {
case EVTHREAD_OBJECT_SOCKET:
EVSOCKET_close( (EVSOCKET *) obj );
break;
case EVTHREAD_OBJECT_TIMER:
EVTIMER_free( (EVTIMER *) obj );
break;
}
}
CTHREAD_free(thread->cthread);
free(thread);
return 0;
}
| void EVTHREAD_OBJECT_free | ( | EVTHREAD_OBJECT * | obj | ) |
Definition at line 154 of file evthread.c.
{
DLIST_unlink( &obj->owner->object_list, &obj->entry );
free(obj);
}
| void EVTHREAD_OBJECT_init | ( | EVTHREAD_OBJECT * | obj, |
| int | type, | ||
| EVTHREAD * | owner | ||
| ) |
Definition at line 147 of file evthread.c.
{
DLIST_push_back( &owner->object_list, &obj->entry );
obj->object_type = type;
obj->owner = owner;
}
| static void evthread_proc | ( | VALUES * | ctx | ) | [static] |
Definition at line 124 of file evthread.c.
{
EVTHREAD *thread;
VALUES_scan( values, "%p", &thread );
thread->thread_proc( thread, thread->socket, thread->user_context );
// no way to get exit status of the EVTHREAD object. everything is hereby closed.
EVTHREAD_free(thread);
}
| static void socket_cb | ( | int | fd, |
| short | event, | ||
| void * | ctx | ||
| ) | [static] |
Definition at line 306 of file evthread.c.
{
EVSOCKET *socket;
M_UNUSED( fd );
socket = (EVSOCKET *) ctx;
if (event & EV_READ && event & EV_WRITE) {
socket->state = EVSOCKET_STATE_ERROR;
CTHREAD_resume( socket->thread->cthread, 0, "%d", EVENT_ID_HAS_IO_ERROR );
return;
}
if (event & EV_READ) {
if (socket->state == EVSOCKET_STATE_READING) {
MLOG_TRACE( "socket %d read event occured", fd );
CTHREAD_resume( socket->thread->cthread, 0, "%d", EVENT_ID_HAS_IO_EVENT);
} else {
MLOG_INFO( "socket %d read event ignored !!!", fd );
}
}
if (event & EV_WRITE) {
if (socket->state == EVSOCKET_STATE_CONNECTING || socket->state == EVSOCKET_STATE_WRITING) {
MLOG_TRACE( "socket %d write event occured", fd );
CTHREAD_resume( socket->thread->cthread, 0, "%d", EVENT_ID_HAS_IO_EVENT);
} else {
MLOG_INFO( "socket %d write event ignored !!!", fd );
}
}
}
| static void socket_listener_cb | ( | int | fd, |
| short | event, | ||
| void * | ctx | ||
| ) | [static] |
Definition at line 670 of file evthread.c.
{
EVTCPACCEPTOR *acceptor;
EVTHREAD_PROC thread_proc;
void *thread_ctx;
EVTHREAD *thread;
EVSOCKET *socket;
int sock;
M_UNUSED(fd);
if (event & EV_READ && event & EV_WRITE) {
return;
}
acceptor = (EVTCPACCEPTOR *) ctx;
do {
sock = accept( acceptor->fd, 0, 0 );
} while( sock == -1 && errno == EINTR);
if (sock == -1) {
return;
}
MLOG_TRACE( "socket %d accepted", sock );
if (acceptor->read_buffer_size != -1) {
fd_set_buf_size( sock, Receive_buffer, acceptor->read_buffer_size );
}
if (acceptor->send_buffer_size != -1) {
fd_set_buf_size( sock, Send_buffer, acceptor->send_buffer_size);
}
// get thread procedure and thread argument data.
if ( acceptor->factory( sock, &thread_proc, &thread_ctx, acceptor->ctx ) ) {
close(sock);
return;
}
// create the user thread.
thread =EVTHREAD_init( acceptor->loop, thread_proc, thread_ctx);
if (!thread) {
close(sock);
return;
}
socket = EVSOCKET_init(thread, sock, 1);
if (!socket) {
//close thread)
close(sock);
return;
}
EVTHREAD_start( thread, socket );
}
| static void thread_timer_cb | ( | int | fd, |
| short | event, | ||
| void * | ctx | ||
| ) | [static] |
Definition at line 135 of file evthread.c.
| static void timer_cb | ( | int | fd, |
| short | event, | ||
| void * | ctx | ||
| ) | [static] |
Definition at line 223 of file evthread.c.
{
EVTIMER *timer;
M_UNUSED(fd);
M_UNUSED(event);
timer = (EVTIMER *) ctx;
event_del( &timer->timer_event );
timer->state = EVTIMER_STATE_INIT;
MLOG_TRACE( "Timer %p event occured", &timer->timer_event );
CTHREAD_resume( timer->object_base.owner->cthread , 0, "%d", timer->timer_id );
}
1.7.4