Simple coroutine library integrated with IO event loop (libevent) / objects in plain C Snapshot
Defines | Functions
evthread.c File Reference
#include <evthread.h>
#include <errno.h>
#include <unistd.h>
#include <sys/time.h>
#include <nutils/ioutils.h>
#include <butils/logg.h>

Go to the source code of this file.

Defines

#define EVENT_ID_HAS_IO_EVENT   0
#define EVENT_ID_HAS_IO_ERROR   1
#define TIMER_ID_COMM_TIMEOUT   2
#define TIMER_ID_IDLE_TIMEOUT   3

Functions

EVLOOPEVLOOP_init (STACKS *stacks)
int EVLOOP_run (EVLOOP *loop)
int EVLOOP_break (EVLOOP *loop)
int EVTHREAD_free (EVTHREAD *thread)
static void evthread_proc (VALUES *ctx)
static void thread_timer_cb (int fd, short event, void *ctx)
EVTHREADEVTHREAD_init (EVLOOP *loop, EVTHREAD_PROC thread_proc, void *user_ctx)
int EVTHREAD_start (EVTHREAD *thread, struct tagEVSOCKET *socket)
int EVTHREAD_delay (EVTHREAD *thread, struct timeval delay)
void EVTHREAD_OBJECT_init (EVTHREAD_OBJECT *obj, int type, EVTHREAD *owner)
void EVTHREAD_OBJECT_free (EVTHREAD_OBJECT *obj)
static void timer_cb (int fd, short event, void *ctx)
EVTIMEREVTIMER_init (EVTHREAD *thread, int timer_id, struct timeval tm)
int EVTIMER_start (EVTIMER *ret)
int EVTIMER_cancel (EVTIMER *timer)
int EVTIMER_free (EVTIMER *timer)
static void socket_cb (int fd, short event, void *ctx)
EVSOCKETEVSOCKET_init (EVTHREAD *thread, int fd, int is_connected)
int EVSOCKET_close (EVSOCKET *socket)
void EVSOCKET_set_idle_timeout (EVSOCKET *socket, struct timeval timeout)
int EVSOCKET_connect (EVSOCKET *socket, struct sockaddr *address, socklen_t socklen, struct timeval timeout)
static int EVSOCKET_recv_internal (EVSOCKET *socket, void *buf, size_t buf_size, int flags, struct timeval timeout)
static int EVSOCKET_send_internal (EVSOCKET *socket, void *buf, size_t buf_size, int flags, struct timeval timeout)
int EVSOCKET_recv (EVSOCKET *socket, void *buf, size_t buf_size, int flags, struct timeval timeout)
int EVSOCKET_recv_all (EVSOCKET *socket, void *buf, size_t buf_size, int flags, struct timeval timeout)
int EVSOCKET_send (EVSOCKET *socket, void *buf, size_t buf_size, int flags, struct timeval timeout)
static void socket_listener_cb (int fd, short event, void *ctx)
EVTCPACCEPTOREVTCPACCEPTOR_init_ex (EVLOOP *loop, SOCKADDR *addr, int listener_backlog, EVTHREAD_FACTORY factory, int read_buffer_size, int send_buffer_size, void *ctx)
EVTCPACCEPTOREVTCPACCEPTOR_init (EVLOOP *loop, int fd, EVTHREAD_FACTORY factory, int read_buffer_size, int send_buffer_size, void *ctx)
void EVTCPACCEPTOR_close (EVTCPACCEPTOR *acceptor)
void custom_timeout_handling (struct event_base *base, struct timeval *tv)

Define Documentation

#define EVENT_ID_HAS_IO_ERROR   1

Definition at line 13 of file evthread.c.

#define EVENT_ID_HAS_IO_EVENT   0

Definition at line 12 of file evthread.c.

#define TIMER_ID_COMM_TIMEOUT   2

Definition at line 14 of file evthread.c.

#define TIMER_ID_IDLE_TIMEOUT   3

Definition at line 15 of file evthread.c.


Function Documentation

void custom_timeout_handling ( struct event_base *  base,
struct timeval *  tv 
)

Definition at line 731 of file evthread.c.

{
   M_UNUSED(base);
   M_UNUSED(tv);
}
static int EVSOCKET_recv_internal ( EVSOCKET socket,
void *  buf,
size_t  buf_size,
int  flags,
struct timeval  timeout 
) [static]

Definition at line 414 of file evthread.c.

{
   int rt;
   int has_event = 0;
   VALUES *rvalues;
   int event_id;

   if ( socket->state != EVSOCKET_STATE_CONNECTED) {
     return -1;
   }
   
r_again:   
   do {
     rt = recv( socket->fd, buf, buf_size, flags);
   } while(rt == -1 && errno == EINTR);

   if (rt == -1) {
     if (errno == EAGAIN) {

       MLOG_TRACE( "socket %d read has blocked", socket->fd ); 
       
       socket->state = EVSOCKET_STATE_READING;
       
       socket->timer_io_timeout = EVTIMER_init( socket->thread, TIMER_ID_COMM_TIMEOUT, timeout );
       if (socket->timer_io_timeout) {  
         EVTIMER_start(socket->timer_io_timeout);       
       }

       if (!has_event) {
         event_add( &socket->read_event, 0 );
         has_event = 1;
       }

       CTHREAD_yield( &rvalues, 0 );
       VALUES_scan( rvalues, "%d", &event_id );
 
       if (socket->timer_io_timeout) {  
         EVTIMER_free(socket->timer_io_timeout);        
         socket->timer_io_timeout = 0; 
       }
      
       switch(event_id) {
         case EVENT_ID_HAS_IO_EVENT:
           MLOG_TRACE( "socket %d received read event", socket->fd ); 
           goto r_again;
         default: 
           MLOG_DEBUG( "socket %d read timed out", socket->fd ); 
           socket->state = EVSOCKET_STATE_ERROR;
           close( socket->fd );
           socket->fd = -1;
           rt = -1;
           break;
       }
     } else {
       MLOG_DEBUG( "socket %d read error. errno %d", socket->fd, errno );
     }
     return -1;
   }

   if (has_event) {
     event_del( &socket->read_event );
   }
   
   if (rt != -1) {
     socket->state = EVSOCKET_STATE_CONNECTED;
   } 
   
   if (rt >= 0) {
     if (socket->timer_idle_timeout) { 
       EVTIMER_cancel(socket->timer_idle_timeout);
     }
   }


   return rt;
} 
static int EVSOCKET_send_internal ( EVSOCKET socket,
void *  buf,
size_t  buf_size,
int  flags,
struct timeval  timeout 
) [static]

Definition at line 491 of file evthread.c.

{
   int rt;
   int has_event = 0;
   VALUES *rvalues;
   int event_id;


   if ( socket->state != EVSOCKET_STATE_CONNECTED) {
     return -1;
   }
   
w_again:   
   do {
    rt = send( socket->fd, buf, buf_size, flags);
   } while(rt == -1 && errno == EINTR);

   if (rt == -1) {
     if (errno == EAGAIN) {

       socket->state = EVSOCKET_STATE_WRITING;
       
       socket->timer_io_timeout = EVTIMER_init( socket->thread, TIMER_ID_COMM_TIMEOUT, timeout );
       if (socket->timer_io_timeout) {
         EVTIMER_start(socket->timer_io_timeout);       
       }

       if (!has_event) {
         event_add( &socket->write_event, 0 );
         has_event = 1;
       }

       CTHREAD_yield( &rvalues, 0 );
       VALUES_scan( rvalues, "%d", &event_id );
 
       if (socket->timer_io_timeout) {  
         EVTIMER_free(socket->timer_io_timeout);        
         socket->timer_io_timeout = 0; 
       }
      
       switch(event_id) {
         case EVENT_ID_HAS_IO_EVENT:
           MLOG_TRACE( "socket %d received write event", socket->fd ); 
           goto w_again;
         default: 
           MLOG_DEBUG( "socket %d write timed out", socket->fd ); 
           socket->state = EVSOCKET_STATE_ERROR;
           close( socket->fd );
           socket->fd = -1;
           rt = -1;
           break;
       }
     }
   }
  
   if (has_event) {
     event_del( &socket->write_event );
   }  

   if (rt != -1) {
     socket->state = EVSOCKET_STATE_CONNECTED;
   } 


   return rt;
} 
int EVTHREAD_free ( EVTHREAD thread)

Definition at line 54 of file evthread.c.

{
   DLIST_entry *entry, *next;
   EVTHREAD_OBJECT *obj;

   DLIST_FOREACH_SAVE( entry, next,  &thread->object_list ) {
     obj = (EVTHREAD_OBJECT *) entry;
     switch( obj->object_type ) {
       case EVTHREAD_OBJECT_SOCKET:
         EVSOCKET_close( (EVSOCKET *) obj );    
         break;
       case EVTHREAD_OBJECT_TIMER:
         EVTIMER_free( (EVTIMER *) obj );    
         break;
     }
   }

   CTHREAD_free(thread->cthread);
   free(thread);
   return 0;
}
void EVTHREAD_OBJECT_free ( EVTHREAD_OBJECT obj)

Definition at line 154 of file evthread.c.

{
  DLIST_unlink( &obj->owner->object_list, &obj->entry );
  free(obj);
}
void EVTHREAD_OBJECT_init ( EVTHREAD_OBJECT obj,
int  type,
EVTHREAD owner 
)

Definition at line 147 of file evthread.c.

{
  DLIST_push_back( &owner->object_list, &obj->entry );
  obj->object_type = type;
  obj->owner = owner;
}
static void evthread_proc ( VALUES *  ctx) [static]

Definition at line 124 of file evthread.c.

{
  EVTHREAD *thread;

  VALUES_scan( values, "%p", &thread );
  thread->thread_proc( thread, thread->socket, thread->user_context );

  // no way to get exit status of the EVTHREAD object. everything is hereby closed.
  EVTHREAD_free(thread);
}
static void socket_cb ( int  fd,
short  event,
void *  ctx 
) [static]

Definition at line 306 of file evthread.c.

{
   EVSOCKET *socket; 

   M_UNUSED( fd );
   
   socket = (EVSOCKET *) ctx;

   if (event & EV_READ && event & EV_WRITE) {
     socket->state = EVSOCKET_STATE_ERROR;
     CTHREAD_resume( socket->thread->cthread, 0, "%d", EVENT_ID_HAS_IO_ERROR );
     return;
   }

   if (event & EV_READ) {
     if (socket->state == EVSOCKET_STATE_READING) {
        MLOG_TRACE( "socket %d read event occured", fd ); 
        CTHREAD_resume( socket->thread->cthread, 0, "%d", EVENT_ID_HAS_IO_EVENT);
     } else {
        MLOG_INFO( "socket %d read event ignored !!!", fd ); 
     }
   }
  
   if (event & EV_WRITE) {
     if (socket->state == EVSOCKET_STATE_CONNECTING || socket->state == EVSOCKET_STATE_WRITING) {
        MLOG_TRACE( "socket %d write event occured", fd ); 
        CTHREAD_resume( socket->thread->cthread, 0, "%d", EVENT_ID_HAS_IO_EVENT);
     } else {
        MLOG_INFO( "socket %d write event ignored !!!", fd ); 
     }
   }
}
static void socket_listener_cb ( int  fd,
short  event,
void *  ctx 
) [static]

Definition at line 670 of file evthread.c.

{
   EVTCPACCEPTOR *acceptor;
   EVTHREAD_PROC thread_proc;
   void *thread_ctx;
   EVTHREAD *thread; 
   EVSOCKET *socket;
   int sock;

   M_UNUSED(fd);

   if (event & EV_READ && event & EV_WRITE) {
      return;
   }

   acceptor = (EVTCPACCEPTOR *) ctx;
 
   do {
     sock = accept( acceptor->fd, 0, 0 );
   } while( sock == -1 && errno == EINTR);

   if (sock == -1) {
     return;
   }

   MLOG_TRACE( "socket %d accepted", sock ); 
 
   
   if (acceptor->read_buffer_size != -1) {
     fd_set_buf_size( sock, Receive_buffer, acceptor->read_buffer_size );
   }

   if (acceptor->send_buffer_size != -1) {
     fd_set_buf_size( sock, Send_buffer, acceptor->send_buffer_size);
   }
  
   // get thread procedure and thread argument data.
   if ( acceptor->factory( sock, &thread_proc, &thread_ctx, acceptor->ctx ) ) {
     close(sock);
     return;
   }


   // create the user thread.
   thread =EVTHREAD_init( acceptor->loop, thread_proc, thread_ctx);
   if (!thread) {
     close(sock);
     return;
   }

   socket = EVSOCKET_init(thread, sock, 1);
   if (!socket) {
     //close thread)
     close(sock);
     return;
   }

   EVTHREAD_start( thread, socket );

}
static void thread_timer_cb ( int  fd,
short  event,
void *  ctx 
) [static]

Definition at line 135 of file evthread.c.

{
  EVTHREAD *thread = (EVTHREAD *) ctx;
 
  M_UNUSED(fd);
  M_UNUSED(event);
    
  CTHREAD_resume( thread->cthread, 0, 0 );
}
static void timer_cb ( int  fd,
short  event,
void *  ctx 
) [static]

Definition at line 223 of file evthread.c.

{
   EVTIMER *timer;

   M_UNUSED(fd);
   M_UNUSED(event);

   timer = (EVTIMER *) ctx;

   event_del( &timer->timer_event );
   timer->state = EVTIMER_STATE_INIT;
   
   MLOG_TRACE( "Timer %p event occured", &timer->timer_event ); 
   CTHREAD_resume( timer->object_base.owner->cthread , 0, "%d", timer->timer_id );
}