Simple tools for multi threading / objects in plain C Snapshot
|
00001 /* Copyright (c) Michael Moser (2011) . 3-clause BSD License applies */ 00002 00003 #include "tqueue.h" 00004 #include <errno.h> 00005 #include <stdio.h> 00006 #include <butils/errorp.h> 00007 00008 typedef struct tagTQUEUE_Entry { 00009 DLIST_entry dlist; 00010 void *entry; 00011 } TQUEUE_Entry; 00012 00013 00014 int TQUEUE_init(TQUEUE *queue, size_t max_count ) 00015 { 00016 pthread_mutex_init( &queue->mutex, 0 ); 00017 pthread_cond_init( &queue->cond_empty, 0 ); 00018 pthread_cond_init( &queue->cond_max, 0 ); 00019 00020 DLIST_init( &queue->dlist ); 00021 00022 queue->max_count = max_count; 00023 queue->waiting_empty = 0; 00024 return 0; 00025 } 00026 00027 int TQUEUE_free(TQUEUE *queue ) 00028 { 00029 pthread_cond_destroy( &queue->cond_empty ); 00030 pthread_cond_destroy( &queue->cond_max ); 00031 pthread_mutex_destroy( &queue->mutex ); 00032 return 0; 00033 } 00034 00035 static int push_entry(DLIST *lst, void *data ) 00036 { 00037 TQUEUE_Entry *entry = (TQUEUE_Entry *) malloc( sizeof(TQUEUE_Entry) ); 00038 if (!entry) { 00039 return -1; 00040 } 00041 entry->entry = data; 00042 DLIST_push_front( lst, (DLIST_entry *) entry ); 00043 return 0; 00044 } 00045 00046 int TQUEUE_push_exit_message(TQUEUE *queue) 00047 { 00048 int rt = 0,r; 00049 TQUEUE_Entry *entry; 00050 00051 if ((r = pthread_mutex_lock(&queue->mutex)) != 0) { 00052 errorp(r, "pthread_mutex_lock failed"); 00053 return -1; 00054 } 00055 entry = (TQUEUE_Entry *) malloc( sizeof(TQUEUE_Entry) ); 00056 entry->entry = 0; 00057 00058 if (entry) { 00059 DLIST_push_back( &queue->dlist, (DLIST_entry *) entry ); 00060 if (queue->waiting_empty > 0) { 00061 if ((r = pthread_cond_signal( &queue->cond_empty )) != 0) { 00062 errorp(r, "pthread_cond_signal failed"); 00063 } 00064 } 00065 } else { 00066 rt = -1; 00067 } 00068 00069 if ((r = pthread_mutex_unlock( &queue->mutex )) != 0) { 00070 errorp( r, "pthread_mutex_unlock failed"); 00071 return -1; 00072 } 00073 00074 return rt; 00075 } 00076 00077 int TQUEUE_push_block_on_queue_full(TQUEUE *queue, void *entry) 00078 { 00079 int r; 00080 00081 if ((r = pthread_mutex_lock(&queue->mutex)) != 0 ) { 00082 errorp(r,"pthread_mutex_lock failed"); 00083 return -1; 00084 } 00085 int rt = 0; 00086 00087 if (queue->max_count != 0 && 00088 DLIST_size( &queue->dlist ) >= queue->max_count) { 00089 00090 if ((r = pthread_cond_wait( &queue->cond_max, &queue->mutex )) != 0 ) { 00091 errorp(r,"pthread_cond_wait failed"); 00092 } 00093 } 00094 00095 if (!push_entry( &queue->dlist, entry )) { 00096 if (queue->waiting_empty > 0) { 00097 if ((r = pthread_cond_signal( &queue->cond_empty )) != 0) { 00098 errorp(r, "pthread_cond_signal failed"); 00099 } 00100 } 00101 } else { 00102 rt = -1; 00103 } 00104 00105 if ((r = pthread_mutex_unlock( &queue->mutex )) != 0) { 00106 errorp(r, "pthread_mutex_unlock fails"); 00107 return -1; 00108 } 00109 return rt; 00110 00111 } 00112 00113 int TQUEUE_push_fail_on_queue_full(TQUEUE *queue, void *entry) 00114 { 00115 int rt = 0, r; 00116 00117 if ((r = pthread_mutex_lock(&queue->mutex)) != 0) { 00118 errorp(r, "pthread_mutex_lock failed"); 00119 return -1; 00120 } 00121 00122 if (queue->max_count != 0 && 00123 DLIST_size( &queue->dlist ) >= queue->max_count) { 00124 rt = -1; 00125 } else { 00126 00127 if (! push_entry( &queue->dlist, entry )) { 00128 if (queue->waiting_empty > 0) { 00129 rt = pthread_cond_signal( &queue->cond_empty ); 00130 if (rt) { 00131 errorp(rt, "pthread_cond_signal failed"); 00132 } 00133 } 00134 } else { 00135 rt = -1; 00136 } 00137 } 00138 if ((r = pthread_mutex_unlock( &queue->mutex )) != 0) { 00139 errorp(r, "pthread_mutex_unlock failed"); 00140 return -1; 00141 } 00142 00143 return rt; 00144 } 00145 00146 00147 int TQUEUE_pop_non_blocking(TQUEUE *queue, void **rret) 00148 { 00149 TQUEUE_Entry *entry = 0; 00150 void *ret = 0; 00151 00152 if (pthread_mutex_lock(&queue->mutex)) { 00153 perror("pthread_mutex_lock failed"); 00154 return 0; 00155 } 00156 if (!DLIST_isempty( &queue->dlist)) { 00157 entry = (TQUEUE_Entry *) DLIST_pop_back( &queue->dlist ); 00158 } 00159 pthread_mutex_unlock( &queue->mutex ); 00160 00161 if (entry) { 00162 ret = entry->entry; 00163 free(entry); 00164 } 00165 00166 if (ret != 0) { 00167 return 0; 00168 } 00169 *rret = ret; 00170 return -1; 00171 } 00172 00173 void *TQUEUE_pop(TQUEUE *queue) 00174 { 00175 TQUEUE_Entry *entry; 00176 void *ret = 0; 00177 int rt; 00178 00179 if (pthread_mutex_lock(&queue->mutex)) { 00180 perror("pthread_mutex_lock failed"); 00181 return 0; 00182 } 00183 00184 if (DLIST_isempty( &queue->dlist ) ) { 00185 queue->waiting_empty ++; 00186 rt = pthread_cond_wait( &queue->cond_empty, &queue->mutex ); 00187 if (rt) { 00188 perror("pthread_cond_wait failed"); 00189 } 00190 queue->waiting_empty --; 00191 } 00192 00193 entry = (TQUEUE_Entry *) DLIST_pop_back( &queue->dlist ); 00194 if (entry) { 00195 ret = entry->entry; 00196 free(entry); 00197 } 00198 00199 00200 if (queue->max_count != 0 && 00201 DLIST_size( &queue->dlist ) == (queue->max_count-1)) { 00202 rt = pthread_cond_signal( &queue->cond_max ); 00203 if (rt) { 00204 perror("pthread_cond_signal failed"); 00205 } 00206 } 00207 00208 pthread_mutex_unlock( &queue->mutex ); 00209 00210 return ret; 00211 } 00212 00213 00214