Simple tools for multi threading / objects in plain C Snapshot
tqueue.c
Go to the documentation of this file.
00001 /* Copyright (c) Michael Moser (2011) . 3-clause BSD License applies */
00002 
00003 #include "tqueue.h"
00004 #include <errno.h>
00005 #include <stdio.h>
00006 #include <butils/errorp.h>
00007 
00008 typedef struct tagTQUEUE_Entry {
00009   DLIST_entry  dlist;
00010   void *entry;
00011 } TQUEUE_Entry;
00012 
00013 
00014 int TQUEUE_init(TQUEUE *queue, size_t max_count )
00015 {
00016   pthread_mutex_init( &queue->mutex, 0 );
00017   pthread_cond_init( &queue->cond_empty, 0 );
00018   pthread_cond_init( &queue->cond_max, 0 );
00019 
00020   DLIST_init( &queue->dlist );
00021 
00022   queue->max_count = max_count;  
00023   queue->waiting_empty = 0;
00024   return 0;
00025 }
00026 
00027 int   TQUEUE_free(TQUEUE *queue )
00028 {
00029   pthread_cond_destroy( &queue->cond_empty );
00030   pthread_cond_destroy( &queue->cond_max );
00031   pthread_mutex_destroy( &queue->mutex );
00032   return 0;
00033 }
00034 
00035 static int push_entry(DLIST *lst, void *data )
00036 {
00037   TQUEUE_Entry *entry = (TQUEUE_Entry *) malloc( sizeof(TQUEUE_Entry) );
00038   if (!entry) {
00039     return -1;
00040   }
00041   entry->entry = data;
00042   DLIST_push_front( lst, (DLIST_entry *) entry );
00043   return 0;
00044 }
00045 
00046 int   TQUEUE_push_exit_message(TQUEUE *queue)
00047 {
00048   int rt = 0,r;
00049   TQUEUE_Entry *entry; 
00050 
00051   if ((r = pthread_mutex_lock(&queue->mutex)) != 0) {
00052     errorp(r, "pthread_mutex_lock failed");
00053     return -1;
00054   }
00055   entry = (TQUEUE_Entry *) malloc( sizeof(TQUEUE_Entry) );
00056   entry->entry = 0;
00057   
00058   if (entry) {
00059     DLIST_push_back( &queue->dlist, (DLIST_entry *) entry );
00060     if (queue->waiting_empty > 0) {
00061       if ((r = pthread_cond_signal( &queue->cond_empty )) != 0) {
00062         errorp(r, "pthread_cond_signal failed");
00063       }
00064     }
00065   } else {
00066     rt = -1;
00067   }
00068 
00069   if ((r = pthread_mutex_unlock( &queue->mutex )) != 0) {
00070     errorp( r, "pthread_mutex_unlock failed");
00071     return -1;
00072   }
00073    
00074   return rt;
00075 }
00076 
00077 int TQUEUE_push_block_on_queue_full(TQUEUE *queue, void *entry)
00078 {
00079   int r;
00080 
00081   if ((r = pthread_mutex_lock(&queue->mutex)) != 0 ) {
00082     errorp(r,"pthread_mutex_lock failed");
00083     return -1;
00084   }
00085   int rt = 0;
00086 
00087   if (queue->max_count != 0 &&
00088       DLIST_size( &queue->dlist ) >= queue->max_count) {
00089         
00090       if ((r = pthread_cond_wait( &queue->cond_max, &queue->mutex )) != 0 ) {
00091         errorp(r,"pthread_cond_wait failed");
00092       }
00093   }
00094   
00095   if (!push_entry( &queue->dlist, entry )) {
00096     if (queue->waiting_empty > 0) {
00097       if ((r = pthread_cond_signal( &queue->cond_empty )) != 0)  {
00098         errorp(r, "pthread_cond_signal failed");
00099       }
00100     }
00101   } else {
00102     rt = -1;
00103   }
00104 
00105   if ((r = pthread_mutex_unlock( &queue->mutex )) != 0) {
00106     errorp(r, "pthread_mutex_unlock fails");
00107     return -1;
00108   }
00109   return rt;
00110 
00111 }
00112 
00113 int TQUEUE_push_fail_on_queue_full(TQUEUE *queue, void *entry)
00114 {
00115   int rt = 0, r;
00116 
00117   if ((r = pthread_mutex_lock(&queue->mutex)) != 0) {
00118     errorp(r, "pthread_mutex_lock failed");
00119     return -1;
00120   }
00121 
00122   if (queue->max_count != 0 &&
00123     DLIST_size( &queue->dlist ) >= queue->max_count) {
00124     rt = -1;
00125   } else {
00126   
00127     if (! push_entry( &queue->dlist, entry )) {
00128       if (queue->waiting_empty > 0) {
00129          rt = pthread_cond_signal( &queue->cond_empty );
00130          if (rt) {
00131            errorp(rt, "pthread_cond_signal failed");
00132          } 
00133       }
00134     } else {
00135       rt = -1;
00136     }
00137   }
00138   if ((r = pthread_mutex_unlock( &queue->mutex )) != 0) {
00139     errorp(r, "pthread_mutex_unlock failed");
00140     return -1;
00141   }
00142 
00143   return rt;
00144 }
00145 
00146 
00147 int   TQUEUE_pop_non_blocking(TQUEUE *queue, void **rret)
00148 {
00149     TQUEUE_Entry  *entry = 0;
00150     void *ret = 0;
00151 
00152     if (pthread_mutex_lock(&queue->mutex)) {
00153       perror("pthread_mutex_lock failed");
00154       return 0;
00155     }
00156     if (!DLIST_isempty( &queue->dlist)) {
00157         entry = (TQUEUE_Entry  *) DLIST_pop_back( &queue->dlist );
00158     }
00159     pthread_mutex_unlock( &queue->mutex );
00160 
00161     if (entry) {
00162       ret = entry->entry;
00163       free(entry);
00164     }
00165 
00166     if (ret != 0) {
00167       return 0;
00168     }
00169     *rret = ret;
00170     return -1;
00171 }
00172 
00173 void *TQUEUE_pop(TQUEUE *queue)
00174 {
00175     TQUEUE_Entry  *entry;
00176     void *ret  = 0;
00177     int rt;
00178 
00179     if (pthread_mutex_lock(&queue->mutex)) {
00180       perror("pthread_mutex_lock failed");
00181       return 0;
00182     }
00183 
00184     if (DLIST_isempty( &queue->dlist ) ) {
00185          queue->waiting_empty ++;
00186          rt = pthread_cond_wait( &queue->cond_empty, &queue->mutex );
00187          if (rt) {
00188            perror("pthread_cond_wait failed");
00189          }
00190          queue->waiting_empty --;
00191     }
00192 
00193     entry = (TQUEUE_Entry  *) DLIST_pop_back( &queue->dlist );
00194     if (entry) {
00195       ret = entry->entry;
00196       free(entry);
00197     }
00198 
00199 
00200     if (queue->max_count != 0 &&
00201         DLIST_size( &queue->dlist ) == (queue->max_count-1)) {
00202         rt = pthread_cond_signal( &queue->cond_max );
00203         if (rt) {
00204            perror("pthread_cond_signal failed");
00205         }
00206     } 
00207 
00208     pthread_mutex_unlock( &queue->mutex );
00209 
00210     return ret;
00211 }
00212 
00213 
00214