Simple tools for multi threading / objects in plain C Snapshot
|
00001 #include "tpool.h" 00002 #include <butils/errorp.h> 00003 #include <pthread.h> 00004 00005 00006 //--- 00007 00008 void RUNNABLE_init(RUNNABLE *runnable, RUNNABLE_HANDLER handler, RUNNABLE_HANDLER free_request ) 00009 { 00010 runnable->handle_request = handler; 00011 runnable->free_request = free_request; 00012 } 00013 00014 00015 void RUNNABLE_free(RUNNABLE *runnable ) 00016 { 00017 if (runnable->free_request) { 00018 runnable->free_request( runnable ); 00019 } 00020 free( runnable ); 00021 } 00022 00023 //--- 00024 00025 static void * worker_thread( void * arg) 00026 { 00027 THREADPOOL *pool = (THREADPOOL *) arg; 00028 RUNNABLE *req; 00029 00030 while ( (req = TQUEUE_pop( &pool->request_queue ) ) != 0 ) { 00031 00032 req->handle_request( req ); 00033 if (pool->process_result != 0) { 00034 pool->process_result( req ); 00035 } 00036 } 00037 00038 CYCLIC_BARRIER_await( &pool->all_finished ); 00039 00040 return 0; 00041 } 00042 00043 00044 THREADPOOL *THREADPOOL_init( RUNNABLE_HANDLER process_result, int queue_size, int num_threads, int stack_size_kb ) 00045 { 00046 THREADPOOL *pool; 00047 pthread_t pth; 00048 pthread_attr_t attr; 00049 int i, rt; 00050 00051 pool = (THREADPOOL *) malloc( sizeof(THREADPOOL) ); 00052 if (!pool) { 00053 return 0; 00054 } 00055 00056 pool->process_result = process_result; 00057 pool->num_threads = num_threads; 00058 00059 TQUEUE_init( &pool->request_queue, queue_size ); 00060 00061 pthread_attr_init( &attr ); 00062 pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_DETACHED ); 00063 pthread_attr_setstacksize( &attr, stack_size_kb * 1024 ); 00064 00065 for( i = 0; i < num_threads; i++) { 00066 00067 if ((rt = pthread_create( &pth, &attr, worker_thread, pool ) ) != 0) { 00068 break; 00069 } 00070 } 00071 if ( i < num_threads) { 00072 errorp(rt, "Can't create thread # %d", i); 00073 CYCLIC_BARRIER_init( &pool->all_finished, i + 1 ); 00074 THREADPOOL_close( pool ); 00075 return 0; 00076 } 00077 00078 CYCLIC_BARRIER_init( &pool->all_finished, num_threads + 1 ); 00079 00080 return pool; 00081 } 00082 00083 void THREADPOOL_close( THREADPOOL *pool ) 00084 { 00085 int i; 00086 00087 for( i=0; i < pool->num_threads; i++) { 00088 TQUEUE_push_exit_message( &pool->request_queue); 00089 } 00090 CYCLIC_BARRIER_await( &pool->all_finished ); 00091 free(pool); 00092 } 00093 00094 int THREADPOOL_send_block_on_queue_full( THREADPOOL *pool, RUNNABLE *request) 00095 { 00096 return TQUEUE_push_block_on_queue_full( &pool->request_queue, request); 00097 } 00098 00099 int THREADPOOL_send_fail_on_queue_full( THREADPOOL *pool, RUNNABLE *request) 00100 { 00101 return TQUEUE_push_fail_on_queue_full( &pool->request_queue, request); 00102 } 00103 00104 00105 00106 00107 00108 00109