Simple tools for multi threading / objects in plain C Snapshot
Classes | Modules | Typedefs | Functions
THREADPOOL

a thread pool with fixed number of worker threads. More...

Collaboration diagram for THREADPOOL:

Classes

struct  tagTHREADPOOL

Modules

 CBARRIER
 

synchronization point for a fixed party of threads.


Typedefs

typedef struct tagTHREADPOOL THREADPOOL

Functions

THREADPOOLTHREADPOOL_init (RUNNABLE_HANDLER process_result, int queue_size, int num_threads, int stack_size_kb)
 constructs a thread pool and starts it.
void THREADPOOL_close (THREADPOOL *pool)
int THREADPOOL_send_block_on_queue_full (THREADPOOL *pool, RUNNABLE *request)
 posts a work request to the pool; blocks if request queue limit is reached
int THREADPOOL_send_fail_on_queue_full (THREADPOOL *pool, RUNNABLE *request)
 posts a work request to the pool; blocks if request queue limit is reached

Detailed Description

a thread pool with fixed number of worker threads.

The pool has a request queue: New requests are enqueued into it, the worker threads dequeue a work request and invoke the 'run' method of work request.

One creation of thread pool the following parameters are specified

It is very important to set a reasonable size limit on the request queue; An unlimited queue can often lead to out of memory conditions: If work requests arrive at a rate faster than what can be processed, a bounded request buffer guards against a situation where the system is run at a load rate that is higher then it's capacity.

In this situation the request queue acts as a bounded buffer; it should be able to account for temporary fluctuations of the load; it can accomodate temporary peaks (i.e. short periods of time when the load is higher then the peak load), on condition that the request load later falls back to normal.

You might find this class similar to java class java.util.concurrent.ThreadPoolExicutor when used with a thread pool size.


Typedef Documentation

typedef struct tagTHREADPOOL THREADPOOL

Function Documentation

void THREADPOOL_close ( THREADPOOL pool)

Definition at line 83 of file tpool.c.

{
  int i;

  for( i=0; i < pool->num_threads; i++) {
    TQUEUE_push_exit_message( &pool->request_queue);
  }
  CYCLIC_BARRIER_await( &pool->all_finished );
  free(pool);
}
THREADPOOL* THREADPOOL_init ( RUNNABLE_HANDLER  process_result,
int  queue_size,
int  num_threads,
int  stack_size_kb 
)

constructs a thread pool and starts it.

The method

  • creates request queue
  • runs fixed number of worker threads

The worker thread

  • fetches a work request from the request queue
  • invokes the handle_request callback stpred in the work request
  • if process_result callback has been registered: invokes process_result callback (in order to have common policy of processing the results).
Parameters:
process_resulta callback that receives work request after invoking it's run method. Value can be 0 (in this case it is not invoked)
queue_sizelimit on the request queue (for details see THREADPOOL )
num_threadsnumber of worker threads
stack_size_kbSize of thread pool stack in kilobytes (-1 if default stack size)

Definition at line 44 of file tpool.c.

{
  THREADPOOL *pool;
  pthread_t pth;
  pthread_attr_t attr;
  int i, rt;

  pool = (THREADPOOL *) malloc( sizeof(THREADPOOL) );
  if (!pool) {
    return 0;
  }

  pool->process_result = process_result;
  pool->num_threads = num_threads;

  TQUEUE_init( &pool->request_queue, queue_size );

  pthread_attr_init( &attr );
  pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_DETACHED );
  pthread_attr_setstacksize( &attr, stack_size_kb * 1024 );

  for( i = 0; i < num_threads; i++) {

     if ((rt = pthread_create( &pth, &attr, worker_thread, pool ) )  != 0) {
        break;
     }
  }
  if ( i < num_threads) {
     errorp(rt, "Can't create thread # %d", i);
     CYCLIC_BARRIER_init( &pool->all_finished, i + 1 );
     THREADPOOL_close( pool );
     return 0;
  }

  CYCLIC_BARRIER_init( &pool->all_finished, num_threads + 1 );

  return pool;
}
int THREADPOOL_send_block_on_queue_full ( THREADPOOL pool,
RUNNABLE request 
)

posts a work request to the pool; blocks if request queue limit is reached

A request is always queued. If request queue limit is reached then this method blocks until the size of the queue falls back, In this case the request is enqueued and this method returns.

Returns 0 on success, -1 on failure (can't allocate memory for request)

Definition at line 94 of file tpool.c.

{
  return TQUEUE_push_block_on_queue_full( &pool->request_queue, request);
}
int THREADPOOL_send_fail_on_queue_full ( THREADPOOL pool,
RUNNABLE request 
)

posts a work request to the pool; blocks if request queue limit is reached

A request is queued if the request queue did not reach its size limit. If request queue limit is reached then this method returns an error.

Returns 0 on success, -1 on failure

Definition at line 99 of file tpool.c.

{
  return TQUEUE_push_fail_on_queue_full( &pool->request_queue, request);
}