Simple tools for multi threading / objects in plain C Snapshot
tpool.c
Go to the documentation of this file.
00001 #include "tpool.h"
00002 #include <butils/errorp.h>
00003 #include <pthread.h>
00004 
00005 
00006 //---
00007 
00008 void RUNNABLE_init(RUNNABLE *runnable, RUNNABLE_HANDLER handler, RUNNABLE_HANDLER free_request  )
00009 {
00010    runnable->handle_request = handler;
00011    runnable->free_request = free_request;  
00012 }
00013 
00014 
00015 void RUNNABLE_free(RUNNABLE *runnable )
00016 {
00017   if (runnable->free_request) {
00018     runnable->free_request( runnable );
00019   }
00020   free( runnable );
00021 }
00022 
00023 //---
00024 
00025 static void * worker_thread( void * arg)
00026 {
00027   THREADPOOL *pool = (THREADPOOL *) arg;
00028   RUNNABLE *req;
00029 
00030   while ( (req = TQUEUE_pop( &pool->request_queue ) ) != 0 ) {
00031      
00032      req->handle_request( req );
00033      if (pool->process_result != 0) {
00034         pool->process_result( req ); 
00035      }
00036   }
00037 
00038   CYCLIC_BARRIER_await( &pool->all_finished );
00039 
00040   return 0;
00041 }
00042 
00043 
00044 THREADPOOL *THREADPOOL_init( RUNNABLE_HANDLER process_result, int queue_size, int num_threads, int stack_size_kb )
00045 {
00046   THREADPOOL *pool;
00047   pthread_t pth;
00048   pthread_attr_t attr;
00049   int i, rt;
00050 
00051   pool = (THREADPOOL *) malloc( sizeof(THREADPOOL) );
00052   if (!pool) {
00053     return 0;
00054   }
00055 
00056   pool->process_result = process_result;
00057   pool->num_threads = num_threads;
00058 
00059   TQUEUE_init( &pool->request_queue, queue_size );
00060 
00061   pthread_attr_init( &attr );
00062   pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_DETACHED );
00063   pthread_attr_setstacksize( &attr, stack_size_kb * 1024 );
00064 
00065   for( i = 0; i < num_threads; i++) {
00066 
00067      if ((rt = pthread_create( &pth, &attr, worker_thread, pool ) )  != 0) {
00068         break;
00069      }
00070   }
00071   if ( i < num_threads) {
00072      errorp(rt, "Can't create thread # %d", i);
00073      CYCLIC_BARRIER_init( &pool->all_finished, i + 1 );
00074      THREADPOOL_close( pool );
00075      return 0;
00076   }
00077 
00078   CYCLIC_BARRIER_init( &pool->all_finished, num_threads + 1 );
00079 
00080   return pool;
00081 }
00082 
00083 void THREADPOOL_close( THREADPOOL *pool )
00084 {
00085   int i;
00086 
00087   for( i=0; i < pool->num_threads; i++) {
00088     TQUEUE_push_exit_message( &pool->request_queue);
00089   }
00090   CYCLIC_BARRIER_await( &pool->all_finished );
00091   free(pool);
00092 }
00093 
00094 int THREADPOOL_send_block_on_queue_full( THREADPOOL *pool, RUNNABLE *request)
00095 {
00096   return TQUEUE_push_block_on_queue_full( &pool->request_queue, request);
00097 }
00098 
00099 int THREADPOOL_send_fail_on_queue_full( THREADPOOL *pool, RUNNABLE *request)
00100 {
00101   return TQUEUE_push_fail_on_queue_full( &pool->request_queue, request);
00102 }
00103 
00104 
00105 
00106 
00107 
00108    
00109