00001 /* synQueue - a sychronized message queue for messages between 00002 * threads. */ 00003 00004 #include "common.h" 00005 #include "dlist.h" 00006 #include "pthreadWrap.h" 00007 #include "synQueue.h" 00008 00009 static char const rcsid[] = "$Id: synQueue.c,v 1.6 2006/03/11 23:07:02 kent Exp $"; 00010 00011 struct synQueue 00012 /* A synchronized queue for messages between threads. */ 00013 { 00014 struct synQueue *next; /* Next in list of queues. */ 00015 struct dlList *queue; /* The queue itself. */ 00016 pthread_mutex_t mutex; /* Mutex to prevent simultanious access. */ 00017 pthread_cond_t cond; /* Conditional to allow waiting until non-empty. */ 00018 }; 00019 00020 struct synQueue *synQueueNew() 00021 /* Make a new, empty, synQueue. */ 00022 { 00023 struct synQueue *sq; 00024 AllocVar(sq); 00025 pthreadMutexInit(&sq->mutex); 00026 pthreadCondInit(&sq->cond); 00027 sq->queue = dlListNew(); 00028 return sq; 00029 } 00030 00031 void synQueueFree(struct synQueue **pSq) 00032 /* Free up synQueue. Be sure no other threads are using 00033 * it first though! This will not free any dynamic memory 00034 * in the messages. Use synQueueFreeAndVals for that. */ 00035 { 00036 struct synQueue *sq = *pSq; 00037 if (sq == NULL) 00038 return; 00039 dlListFree(&sq->queue); 00040 pthreadCondDestroy(&sq->cond); 00041 pthreadMutexDestroy(&sq->mutex); 00042 freez(pSq); 00043 } 00044 00045 void synQueueFreeAndVals(struct synQueue **pSq) 00046 /* Free up synQueue. Be sure no other threads are using 00047 * it first though! This will freeMem all the messages */ 00048 { 00049 struct synQueue *sq = *pSq; 00050 if (sq == NULL) 00051 return; 00052 dlListFreeAndVals(&sq->queue); 00053 pthreadCondDestroy(&sq->cond); 00054 pthreadMutexDestroy(&sq->mutex); 00055 freez(pSq); 00056 } 00057 00058 void synQueuePut(struct synQueue *sq, void *message) 00059 /* Add message to end of queue. */ 00060 { 00061 pthreadMutexLock(&sq->mutex); 00062 dlAddValTail(sq->queue, message); 00063 pthreadCondSignal(&sq->cond); 00064 pthreadMutexUnlock(&sq->mutex); 00065 } 00066 00067 void *synQueueGet(struct synQueue *sq) 00068 /* Get message off start of queue. Wait until there is 00069 * a message if queue is empty. */ 00070 { 00071 void *message; 00072 struct dlNode *node; 00073 pthreadMutexLock(&sq->mutex); 00074 while (dlEmpty(sq->queue)) 00075 pthreadCondWait(&sq->cond, &sq->mutex); 00076 node = dlPopHead(sq->queue); 00077 pthreadMutexUnlock(&sq->mutex); 00078 message = node->val; 00079 freeMem(node); 00080 return message; 00081 } 00082 00083 void *synQueueGrab(struct synQueue *sq) 00084 /* Get message off start of queue. Return NULL immediately 00085 * if queue is empty. */ 00086 { 00087 void *message = NULL; 00088 struct dlNode *node; 00089 pthreadMutexLock(&sq->mutex); 00090 node = dlPopHead(sq->queue); 00091 pthreadMutexUnlock(&sq->mutex); 00092 if (node != NULL) 00093 { 00094 message = node->val; 00095 freeMem(node); 00096 } 00097 return message; 00098 } 00099 00100 int synQueueSize(struct synQueue *sq) 00101 /* Return number of messages currently on queue. */ 00102 { 00103 int size; 00104 pthreadMutexLock(&sq->mutex); 00105 size = dlCount(sq->queue); 00106 pthreadMutexUnlock(&sq->mutex); 00107 return size; 00108 }
1.5.2