Simple coroutine library integrated with IO event loop (libevent) / objects in plain C Snapshot
evthread.c
Go to the documentation of this file.
00001 #include <evthread.h>
00002 #include <errno.h>
00003 #include <unistd.h>
00004 #include <sys/time.h>
00005 #include <nutils/ioutils.h>
00006 #include <butils/logg.h>
00007 
00008 
00009 
00010 // ---------------------------------------------------------------------------
00011 
00012 #define EVENT_ID_HAS_IO_EVENT  0 
00013 #define EVENT_ID_HAS_IO_ERROR  1
00014 #define TIMER_ID_COMM_TIMEOUT  2
00015 #define TIMER_ID_IDLE_TIMEOUT  3
00016 
00017 // ---------------------------------------------------------------------------
00018 
00019 EVLOOP * EVLOOP_init(STACKS *stacks )
00020 {
00021   EVLOOP *loop;
00022 
00023   loop = (EVLOOP *)  malloc( sizeof( EVLOOP ) );
00024   if (!loop) {
00025     return 0;
00026   }
00027 
00028   if (!stacks) {
00029     return 0;
00030   }
00031   loop->stacks = stacks;
00032  
00033   disable_sigpipe();
00034 
00035   loop->ev_base = event_init();
00036   if (!loop->ev_base) {
00037     return 0;
00038   }
00039 
00040   return loop;
00041 }
00042 
00043 int EVLOOP_run( EVLOOP *loop )
00044 {
00045   return event_base_dispatch( loop->ev_base );
00046 }
00047 
00048 int EVLOOP_break( EVLOOP *loop )
00049 {
00050   return  event_base_loopbreak( loop->ev_base ); 
00051 }
00052 
00053 // ---------------------------------------------------------------------------
00054 int EVTHREAD_free( EVTHREAD *thread )
00055 {
00056    DLIST_entry *entry, *next;
00057    EVTHREAD_OBJECT *obj;
00058 
00059    DLIST_FOREACH_SAVE( entry, next,  &thread->object_list ) {
00060      obj = (EVTHREAD_OBJECT *) entry;
00061      switch( obj->object_type ) {
00062        case EVTHREAD_OBJECT_SOCKET:
00063          EVSOCKET_close( (EVSOCKET *) obj );    
00064          break;
00065        case EVTHREAD_OBJECT_TIMER:
00066          EVTIMER_free( (EVTIMER *) obj );    
00067          break;
00068      }
00069    }
00070 
00071    CTHREAD_free(thread->cthread);
00072    free(thread);
00073    return 0;
00074 }
00075 
00076 static void evthread_proc( VALUES *ctx );
00077 static void thread_timer_cb( int fd, short event, void *ctx);
00078 
00079 EVTHREAD *EVTHREAD_init(EVLOOP *loop, EVTHREAD_PROC thread_proc,  void *user_ctx)
00080 {
00081   EVTHREAD *thread;
00082 
00083   thread = (EVTHREAD *) malloc( sizeof( EVTHREAD ) );
00084   if (!thread) {
00085     return 0;
00086   }
00087   thread->loop = loop;
00088   thread->thread_proc = thread_proc;
00089   thread->user_context = user_ctx;
00090 
00091   thread->cthread = CTHREAD_init( loop->stacks, evthread_proc );
00092   if (!thread->cthread) {
00093     free(thread);
00094     return 0;
00095   }
00096   thread->socket = 0;
00097 
00098   DLIST_init( &thread->object_list );
00099 
00100   return thread;
00101 }
00102 
00103 int EVTHREAD_start( EVTHREAD *thread, struct tagEVSOCKET *socket )
00104 {
00105   thread->socket = socket;
00106   CTHREAD_start( thread->cthread, 0, "%p", thread );
00107   return 0;
00108 }
00109 
00110 int EVTHREAD_delay( EVTHREAD *thread, struct timeval delay )
00111 {
00112   event_set( &thread->timer_event, -1, 0, thread_timer_cb, (void *) thread );
00113   event_base_set( thread->loop->ev_base, &thread->timer_event );
00114   event_add( &thread->timer_event, &delay );
00115    
00116   CTHREAD_yield( 0, 0);
00117    
00118   event_del( &thread->timer_event );
00119 
00120  
00121   return 0;
00122 }
00123 
00124 static void evthread_proc( VALUES *values )
00125 {
00126   EVTHREAD *thread;
00127 
00128   VALUES_scan( values, "%p", &thread );
00129   thread->thread_proc( thread, thread->socket, thread->user_context );
00130 
00131   // no way to get exit status of the EVTHREAD object. everything is hereby closed.
00132   EVTHREAD_free(thread);
00133 }
00134 
00135 static void thread_timer_cb( int fd, short event, void *ctx)
00136 {
00137   EVTHREAD *thread = (EVTHREAD *) ctx;
00138  
00139   M_UNUSED(fd);
00140   M_UNUSED(event);
00141     
00142   CTHREAD_resume( thread->cthread, 0, 0 );
00143 }
00144 
00145 
00146 // ---------------------------------------------------------------------------
00147 void EVTHREAD_OBJECT_init(EVTHREAD_OBJECT *obj, int type, EVTHREAD *owner)
00148 {
00149   DLIST_push_back( &owner->object_list, &obj->entry );
00150   obj->object_type = type;
00151   obj->owner = owner;
00152 }
00153 
00154 void EVTHREAD_OBJECT_free(EVTHREAD_OBJECT *obj)
00155 {
00156   DLIST_unlink( &obj->owner->object_list, &obj->entry );
00157   free(obj);
00158 }
00159 
00160 // ---------------------------------------------------------------------------
00161 
00162 static void timer_cb( int fd, short event, void *ctx);
00163 
00164 
00165 EVTIMER *EVTIMER_init(EVTHREAD *thread, int timer_id, struct timeval tm )
00166 {
00167   EVTIMER *ret;
00168  
00169   ret = (EVTIMER *) malloc( sizeof( EVTIMER ) );
00170   if (!ret) {
00171     return 0;
00172   }
00173 
00174   ret->loop = thread->loop;
00175   ret->timer_id = timer_id;
00176   ret->tm = tm;
00177   ret->state = EVTIMER_STATE_INIT;
00178  
00179   EVTHREAD_OBJECT_init( &ret->object_base, EVTHREAD_OBJECT_TIMER , thread );
00180  
00181   return ret;
00182 }
00183 
00184 
00185 int EVTIMER_start( EVTIMER *ret)
00186 {
00187   if (ret->state != EVTIMER_STATE_INIT) {
00188     return -1;
00189   }
00190 
00191   ret->state = EVTIMER_STATE_SCHEDULED;
00192   
00193   event_set( &ret->timer_event, -1, 0, timer_cb, (void *) ret );
00194   event_base_set( ret->loop->ev_base, &ret->timer_event );
00195   event_add( &ret->timer_event, &ret->tm );
00196 
00197   MLOG_TRACE( "Timer %p started %ld:%ld", &ret->timer_event, ret->tm.tv_sec, ret->tm.tv_usec ); 
00198   return 0;
00199 }
00200 
00201 int  EVTIMER_cancel( EVTIMER *timer )
00202 {
00203   if (timer->state != EVTIMER_STATE_SCHEDULED) {
00204     return -1;
00205   }
00206   
00207   MLOG_TRACE( "Timer %p canceled", &timer->timer_event ); 
00208 
00209   event_del( &timer->timer_event );
00210   timer->state = EVTIMER_STATE_INIT;
00211   return 0;
00212 }
00213 
00214 int  EVTIMER_free( EVTIMER *timer )
00215 {
00216   EVTIMER_cancel(timer);
00217   EVTHREAD_OBJECT_free( &timer->object_base );
00218   return 0;
00219 }
00220 
00221 
00222 
00223 static void timer_cb( int fd, short event, void *ctx)
00224 {
00225    EVTIMER *timer;
00226 
00227    M_UNUSED(fd);
00228    M_UNUSED(event);
00229 
00230    timer = (EVTIMER *) ctx;
00231 
00232    event_del( &timer->timer_event );
00233    timer->state = EVTIMER_STATE_INIT;
00234    
00235    MLOG_TRACE( "Timer %p event occured", &timer->timer_event ); 
00236    CTHREAD_resume( timer->object_base.owner->cthread , 0, "%d", timer->timer_id );
00237 }
00238 
00239 // ---------------------------------------------------------------------------
00240 static void socket_cb( int fd, short event, void *ctx);
00241 
00242 EVSOCKET *EVSOCKET_init(EVTHREAD *thread, int fd, int is_connected)
00243 {
00244   EVSOCKET *socket;
00245 
00246   socket = (EVSOCKET *) malloc( sizeof( EVSOCKET ) );
00247   if (!socket) {
00248     return 0;
00249   }
00250 
00251 
00252   socket->fd = fd;
00253   socket->timer_idle_timeout = 0;
00254   socket->timer_io_timeout = 0;
00255   socket->state = is_connected ? EVSOCKET_STATE_CONNECTED : EVSOCKET_STATE_INIT; 
00256   
00257   socket->thread = thread;
00258   socket->loop = thread->loop;
00259   memset( &socket->idle_timeout, 0 , sizeof(struct timeval));
00260 
00261   
00262   if (fd_set_blocking( fd, 0 )) {
00263     free(socket);
00264     return  0;
00265   }
00266   
00267   EVTHREAD_OBJECT_init( &socket->object_base, EVTHREAD_OBJECT_SOCKET , thread );
00268  
00269   event_set( &socket->read_event, fd, EV_READ , socket_cb, (void *) socket );
00270   event_base_set( socket->loop->ev_base, &socket->read_event );
00271 
00272   event_set( &socket->write_event, fd, EV_WRITE , socket_cb, (void *) socket );
00273   event_base_set( socket->loop->ev_base, &socket->write_event );
00274 
00275   return socket;
00276 }   
00277 
00278 
00279 int EVSOCKET_close(EVSOCKET *socket)
00280 {
00281   int rt;
00282 
00283   if ( socket->fd == -1) {
00284     return -1;
00285   }
00286 
00287   if (socket->timer_idle_timeout) {
00288     EVTIMER_free( socket->timer_idle_timeout );
00289   }
00290   if (socket->timer_io_timeout) {
00291     EVTIMER_free( socket->timer_io_timeout);  
00292   }
00293 
00294   event_del( &socket->read_event );   
00295   event_del( &socket->write_event );   
00296   
00297   do {
00298     rt = close(socket->fd);
00299   } while(rt == -1 && errno == EINTR);
00300   socket->fd = -1;
00301 
00302   EVTHREAD_OBJECT_free( &socket->object_base );
00303   return rt;
00304 }
00305 
00306 static void socket_cb( int fd, short event, void *ctx)
00307 {
00308    EVSOCKET *socket; 
00309 
00310    M_UNUSED( fd );
00311    
00312    socket = (EVSOCKET *) ctx;
00313 
00314    if (event & EV_READ && event & EV_WRITE) {
00315      socket->state = EVSOCKET_STATE_ERROR;
00316      CTHREAD_resume( socket->thread->cthread, 0, "%d", EVENT_ID_HAS_IO_ERROR );
00317      return;
00318    }
00319 
00320    if (event & EV_READ) {
00321      if (socket->state == EVSOCKET_STATE_READING) {
00322         MLOG_TRACE( "socket %d read event occured", fd ); 
00323         CTHREAD_resume( socket->thread->cthread, 0, "%d", EVENT_ID_HAS_IO_EVENT);
00324      } else {
00325         MLOG_INFO( "socket %d read event ignored !!!", fd ); 
00326      }
00327    }
00328   
00329    if (event & EV_WRITE) {
00330      if (socket->state == EVSOCKET_STATE_CONNECTING || socket->state == EVSOCKET_STATE_WRITING) {
00331         MLOG_TRACE( "socket %d write event occured", fd ); 
00332         CTHREAD_resume( socket->thread->cthread, 0, "%d", EVENT_ID_HAS_IO_EVENT);
00333      } else {
00334         MLOG_INFO( "socket %d write event ignored !!!", fd ); 
00335      }
00336    }
00337 }
00338 
00339 void EVSOCKET_set_idle_timeout(EVSOCKET *socket, struct timeval timeout )
00340 { 
00341   M_UNUSED( socket );
00342   M_UNUSED( timeout );
00343 
00344   socket->idle_timeout = timeout;
00345   if (socket->state == EVSOCKET_STATE_CONNECTED) {
00346      socket->timer_idle_timeout = EVTIMER_init( socket->thread, TIMER_ID_IDLE_TIMEOUT, timeout );
00347      if (socket->timer_idle_timeout) { 
00348        EVTIMER_start(socket->timer_idle_timeout);       
00349      }
00350   }
00351 }
00352 
00353 int EVSOCKET_connect( EVSOCKET *socket, struct sockaddr *address, socklen_t socklen, struct timeval timeout)
00354 {
00355   int rt;
00356   VALUES *rvalues;
00357   int event_id;
00358 
00359   if (socket->state != EVSOCKET_STATE_INIT) {
00360     return -1;
00361   }
00362 
00363   do {
00364     rt = connect( socket->fd, address, socklen );
00365   } while (rt == -1 && errno == EINTR);
00366  
00367   if (rt == 0) {
00368      MLOG_DEBUG( "socket %d connected", socket->fd ); 
00369      socket->state = EVSOCKET_STATE_CONNECTED;
00370      return 0;
00371   }
00372   if (rt == -1)  {
00373     if (errno == EINPROGRESS) {
00374      socket->state = EVSOCKET_STATE_CONNECTING;
00375 
00376      socket->timer_io_timeout = EVTIMER_init( socket->thread, TIMER_ID_COMM_TIMEOUT, timeout );
00377      if (socket->timer_io_timeout) { 
00378        EVTIMER_start(socket->timer_io_timeout);         
00379      }
00380 
00381      event_add( &socket->write_event, 0 );
00382      
00383      CTHREAD_yield( &rvalues, 0 );
00384      VALUES_scan( rvalues, "%d", &event_id );
00385 
00386      event_del( &socket->write_event );
00387      
00388      if (socket->timer_io_timeout) {
00389        EVTIMER_free( socket->timer_io_timeout ); 
00390        socket->timer_io_timeout = 0; 
00391      }
00392 
00393      switch(event_id) {
00394        case EVENT_ID_HAS_IO_EVENT:
00395          MLOG_DEBUG( "socket %d connected", socket->fd ); 
00396          socket->state = EVSOCKET_STATE_CONNECTED;
00397          rt = 0;
00398          break;
00399        default: 
00400          MLOG_DEBUG( "socket %d connect timeout", socket->fd ); 
00401          socket->state = EVSOCKET_STATE_ERROR;
00402          close( socket->fd );
00403          socket->fd = -1;
00404          rt = -1;
00405          break;
00406       }
00407     } else {
00408       MLOG_DEBUG("socket %d connect error, errno %d", socket->fd, errno );
00409     }
00410   }
00411   return rt;
00412 }
00413 
00414 static int EVSOCKET_recv_internal( EVSOCKET *socket, void *buf, size_t buf_size, int flags, struct timeval timeout )
00415 {
00416    int rt;
00417    int has_event = 0;
00418    VALUES *rvalues;
00419    int event_id;
00420 
00421    if ( socket->state != EVSOCKET_STATE_CONNECTED) {
00422      return -1;
00423    }
00424    
00425 r_again:   
00426    do {
00427      rt = recv( socket->fd, buf, buf_size, flags);
00428    } while(rt == -1 && errno == EINTR);
00429 
00430    if (rt == -1) {
00431      if (errno == EAGAIN) {
00432 
00433        MLOG_TRACE( "socket %d read has blocked", socket->fd ); 
00434        
00435        socket->state = EVSOCKET_STATE_READING;
00436        
00437        socket->timer_io_timeout = EVTIMER_init( socket->thread, TIMER_ID_COMM_TIMEOUT, timeout );
00438        if (socket->timer_io_timeout) {  
00439          EVTIMER_start(socket->timer_io_timeout);       
00440        }
00441 
00442        if (!has_event) {
00443          event_add( &socket->read_event, 0 );
00444          has_event = 1;
00445        }
00446 
00447        CTHREAD_yield( &rvalues, 0 );
00448        VALUES_scan( rvalues, "%d", &event_id );
00449  
00450        if (socket->timer_io_timeout) {  
00451          EVTIMER_free(socket->timer_io_timeout);        
00452          socket->timer_io_timeout = 0; 
00453        }
00454       
00455        switch(event_id) {
00456          case EVENT_ID_HAS_IO_EVENT:
00457            MLOG_TRACE( "socket %d received read event", socket->fd ); 
00458            goto r_again;
00459          default: 
00460            MLOG_DEBUG( "socket %d read timed out", socket->fd ); 
00461            socket->state = EVSOCKET_STATE_ERROR;
00462            close( socket->fd );
00463            socket->fd = -1;
00464            rt = -1;
00465            break;
00466        }
00467      } else {
00468        MLOG_DEBUG( "socket %d read error. errno %d", socket->fd, errno );
00469      }
00470      return -1;
00471    }
00472 
00473    if (has_event) {
00474      event_del( &socket->read_event );
00475    }
00476    
00477    if (rt != -1) {
00478      socket->state = EVSOCKET_STATE_CONNECTED;
00479    } 
00480    
00481    if (rt >= 0) {
00482      if (socket->timer_idle_timeout) { 
00483        EVTIMER_cancel(socket->timer_idle_timeout);
00484      }
00485    }
00486 
00487 
00488    return rt;
00489 } 
00490 
00491 static int EVSOCKET_send_internal( EVSOCKET *socket, void *buf, size_t buf_size, int flags, struct timeval timeout )
00492 {
00493    int rt;
00494    int has_event = 0;
00495    VALUES *rvalues;
00496    int event_id;
00497 
00498 
00499    if ( socket->state != EVSOCKET_STATE_CONNECTED) {
00500      return -1;
00501    }
00502    
00503 w_again:   
00504    do {
00505     rt = send( socket->fd, buf, buf_size, flags);
00506    } while(rt == -1 && errno == EINTR);
00507 
00508    if (rt == -1) {
00509      if (errno == EAGAIN) {
00510 
00511        socket->state = EVSOCKET_STATE_WRITING;
00512        
00513        socket->timer_io_timeout = EVTIMER_init( socket->thread, TIMER_ID_COMM_TIMEOUT, timeout );
00514        if (socket->timer_io_timeout) {
00515          EVTIMER_start(socket->timer_io_timeout);       
00516        }
00517 
00518        if (!has_event) {
00519          event_add( &socket->write_event, 0 );
00520          has_event = 1;
00521        }
00522 
00523        CTHREAD_yield( &rvalues, 0 );
00524        VALUES_scan( rvalues, "%d", &event_id );
00525  
00526        if (socket->timer_io_timeout) {  
00527          EVTIMER_free(socket->timer_io_timeout);        
00528          socket->timer_io_timeout = 0; 
00529        }
00530       
00531        switch(event_id) {
00532          case EVENT_ID_HAS_IO_EVENT:
00533            MLOG_TRACE( "socket %d received write event", socket->fd ); 
00534            goto w_again;
00535          default: 
00536            MLOG_DEBUG( "socket %d write timed out", socket->fd ); 
00537            socket->state = EVSOCKET_STATE_ERROR;
00538            close( socket->fd );
00539            socket->fd = -1;
00540            rt = -1;
00541            break;
00542        }
00543      }
00544    }
00545   
00546    if (has_event) {
00547      event_del( &socket->write_event );
00548    }  
00549 
00550    if (rt != -1) {
00551      socket->state = EVSOCKET_STATE_CONNECTED;
00552    } 
00553 
00554 
00555    return rt;
00556 } 
00557 
00558 int EVSOCKET_recv( EVSOCKET *socket, void *buf, size_t buf_size, int flags, struct timeval timeout )
00559 {
00560   int rt;
00561 
00562   rt = EVSOCKET_recv_internal( socket, buf, buf_size, flags, timeout );
00563   if (rt >= 0) {
00564      if (socket->timer_idle_timeout) { 
00565        EVTIMER_start(socket->timer_idle_timeout);
00566      }
00567   }
00568   return rt;
00569 }
00570 
00571 int EVSOCKET_recv_all( EVSOCKET *socket, void *buf, size_t buf_size, int flags, struct timeval timeout )
00572 {
00573   uint8_t *cur = (uint8_t *) buf;
00574   int pos, rt;
00575 
00576   for(pos = 0 ; buf_size != 0 ; pos += rt ) {
00577     rt = EVSOCKET_recv_internal( socket, cur, buf_size, flags, timeout );
00578     if (rt <= 0) {
00579       return rt;
00580     }
00581     cur += rt;
00582     buf_size -= rt;
00583   }
00584 
00585   if (rt >= 0) {
00586      if (socket->timer_idle_timeout) { 
00587        EVTIMER_start(socket->timer_idle_timeout);
00588      }
00589   }
00590   
00591   return pos;
00592 }
00593 
00594 int EVSOCKET_send( EVSOCKET *socket, void *buf, size_t buf_size, int flags, struct timeval timeout )
00595 {
00596   uint8_t *cur = (uint8_t *) buf;
00597   int pos, rt;
00598 
00599   for(pos = 0 ; buf_size != 0 ; pos += rt ) {
00600 
00601     rt = EVSOCKET_send_internal( socket, cur, buf_size, flags, timeout );
00602     if (rt < 0) {
00603       return -1;
00604     }
00605     cur += rt;
00606     buf_size -= rt;
00607   } 
00608   return pos;
00609 }
00610 
00611 // ---------------------------------------------------------------------------
00612 
00613 static void socket_listener_cb( int fd, short event, void *ctx);
00614 
00615 EVTCPACCEPTOR * EVTCPACCEPTOR_init_ex( EVLOOP *loop, SOCKADDR *addr, int listener_backlog,  EVTHREAD_FACTORY factory, int read_buffer_size, int send_buffer_size, void *ctx )
00616 {
00617   int listener_fd;
00618 
00619   listener_fd = fd_make_tcp_listener( addr, listener_backlog );
00620   if (listener_fd == -1) {
00621     MLOG_ERROR( "Can't listen on address %s. error errno %d", SOCKADDR_to_string( addr ), errno );
00622     return 0; 
00623   }
00624 
00625   return EVTCPACCEPTOR_init( loop, listener_fd, factory, read_buffer_size, send_buffer_size, ctx );
00626  
00627 
00628 }
00629 
00630 EVTCPACCEPTOR * EVTCPACCEPTOR_init( EVLOOP *loop, int fd, EVTHREAD_FACTORY factory, int read_buffer_size, int send_buffer_size, void *ctx )
00631 {
00632   EVTCPACCEPTOR *acceptor;
00633 
00634   acceptor = (EVTCPACCEPTOR *) malloc( sizeof( EVTCPACCEPTOR ) );
00635   if (!acceptor) {
00636     return 0;
00637   }
00638   
00639   if (fd_set_blocking( fd, 0 )) {
00640     free(acceptor);
00641     return  0;
00642   }
00643 
00644   acceptor->loop = loop;
00645   acceptor->factory = factory;
00646   acceptor->fd = fd;
00647   acceptor->ctx = ctx;
00648 
00649   acceptor->read_buffer_size = read_buffer_size;
00650   acceptor->send_buffer_size = send_buffer_size;
00651 
00652   event_set( &acceptor->read_event, fd, EV_READ | EV_PERSIST, socket_listener_cb, (void *) acceptor );
00653   event_base_set( loop->ev_base, &acceptor->read_event );
00654   event_add( &acceptor->read_event, 0 );
00655 
00656   return acceptor;
00657 }
00658 
00659 void EVTCPACCEPTOR_close(EVTCPACCEPTOR *acceptor)
00660 {
00661   event_del(&acceptor->read_event);
00662   if (acceptor->fd != -1) {
00663     close(acceptor->fd);
00664   }
00665   free(acceptor);
00666 }
00667 
00668 
00669 
00670 static void socket_listener_cb( int fd, short event, void *ctx)
00671 {
00672    EVTCPACCEPTOR *acceptor;
00673    EVTHREAD_PROC thread_proc;
00674    void *thread_ctx;
00675    EVTHREAD *thread; 
00676    EVSOCKET *socket;
00677    int sock;
00678 
00679    M_UNUSED(fd);
00680 
00681    if (event & EV_READ && event & EV_WRITE) {
00682       return;
00683    }
00684 
00685    acceptor = (EVTCPACCEPTOR *) ctx;
00686  
00687    do {
00688      sock = accept( acceptor->fd, 0, 0 );
00689    } while( sock == -1 && errno == EINTR);
00690 
00691    if (sock == -1) {
00692      return;
00693    }
00694 
00695    MLOG_TRACE( "socket %d accepted", sock ); 
00696  
00697    
00698    if (acceptor->read_buffer_size != -1) {
00699      fd_set_buf_size( sock, Receive_buffer, acceptor->read_buffer_size );
00700    }
00701 
00702    if (acceptor->send_buffer_size != -1) {
00703      fd_set_buf_size( sock, Send_buffer, acceptor->send_buffer_size);
00704    }
00705   
00706    // get thread procedure and thread argument data.
00707    if ( acceptor->factory( sock, &thread_proc, &thread_ctx, acceptor->ctx ) ) {
00708      close(sock);
00709      return;
00710    }
00711 
00712 
00713    // create the user thread.
00714    thread =EVTHREAD_init( acceptor->loop, thread_proc, thread_ctx);
00715    if (!thread) {
00716      close(sock);
00717      return;
00718    }
00719 
00720    socket = EVSOCKET_init(thread, sock, 1);
00721    if (!socket) {
00722      //close thread)
00723      close(sock);
00724      return;
00725    }
00726 
00727    EVTHREAD_start( thread, socket );
00728 
00729 }
00730 
00731 void custom_timeout_handling( struct event_base *base, struct timeval *tv )
00732 {
00733    M_UNUSED(base);
00734    M_UNUSED(tv);
00735 }