lib/rudp.c

Go to the documentation of this file.
00001 /* rudp - (semi) reliable UDP communication.  This adds an
00002  * acknowledgement and resend layer on top of UDP. 
00003  *
00004  * UDP is a packet based rather than stream based internet communication 
00005  * protocol. Messages sent by UDP are checked for integrety by the UDP layer, 
00006  * and discarded if transmission errors are detected.  However packets are
00007  * not necessarily received in the same order that they are sent,
00008  * and packets may be duplicated or lost.  
00009  
00010  * Using rudp packets are only very rarely lost, and the sender is 
00011  * notified if they are.  After rudp there are still duplicate
00012  * packets that may arrive out of order.  Aside from the duplicates
00013  * the packets are in order though.
00014  *
00015  * For many, perhaps most applications, TCP/IP is a saner choice
00016  * than UDP or rudp.  If the communication channel is between just 
00017  * two computers you can pretty much just treat TCP/IP as a fairly
00018  * reliable pipe.   However if the communication involves many
00019  * computers sometimes UDP can be a better choice.  It is possible to
00020  * do broadcast and multicast with UDP but not with TCP/IP.  Also
00021  * for systems like parasol, where a server may be making and breaking
00022  * connections rapidly to thousands of computers, TCP paradoxically
00023  * can end up less reliable than UDP.  Though TCP is relatively 
00024  * robust when a connection is made,  it can relatively easily fail
00025  * to make a connection in the first place, and spend quite a long
00026  * time figuring out that the connection can't be made.  Moreover at
00027  * the end of each connection TCP goes into a 'TIMED_WAIT' state,  which
00028  * prevents another connection from coming onto the same port for a
00029  * time that can be as long as 255 seconds.  Since there are only
00030  * about 15000 available ports,  this limits TCP/IP to 60 connections
00031  * per second in some cases.  Generally the system does not handle
00032  * running out of ports gracefully, and this did occur with the 
00033  * TCP/IP based version of Parasol.
00034  *
00035  * This module puts a thin layer around UDP to make it a little bit more
00036  * reliable.  Currently the interface is geared towards Parasol rather
00037  * than broadcast type applications.  This module will try to send
00038  * a message a limited number of times before giving up.  It puts
00039  * a small header containing a message ID and timestamp on each message.   
00040  * This header is echoed back in acknowledgment by the reciever. This
00041  * echo lets the sender know if it needs to resend the message, and
00042  * lets it know how long a message takes to get to the destination and
00043  * back.  It uses this round trip time information to figure out how
00044  * long to wait between resends. 
00045  *
00046  * Much of this code is based on the 'Adding Reliability to a UDP Application
00047  * section in volume I, chapter 20, section 5, of _UNIX Network Programming_
00048  * by W. Richard Stevens. */
00049 
00050 
00051 #include "common.h"
00052 #include <sys/time.h>
00053 #include <sys/socket.h>
00054 #include <netinet/in.h>
00055 #include "errabort.h"
00056 #include "rudp.h"
00057 
00058 static char const rcsid[] = "$Id: rudp.c,v 1.14 2005/12/12 02:24:47 kent Exp $";
00059 
00060 #define MAX_TIME_OUT 999999
00061 
00062 static int rudpCalcTimeOut(struct rudp *ru)
00063 /* Return calculation of time out based on current data. */
00064 {
00065 int timeOut = ru->rttAve + (ru->rttVary<<2);
00066 if (timeOut > MAX_TIME_OUT) timeOut = MAX_TIME_OUT; /* No more than a second. */
00067 if (timeOut < 10000) timeOut = 10000;   /* No less than 1/100th second */
00068 return timeOut;
00069 }
00070 
00071 static void rudpAddRoundTripTime(struct rudp *ru, int time)
00072 /* Add info about round trip time and based on this recalculate
00073  * time outs. */
00074 {
00075 int delta;
00076 ru->rttLast = time;
00077 delta = time - ru->rttAve;
00078 ru->rttAve += (delta>>3);                     /* g = 1/8 */
00079 if (delta < 0) delta = -delta;
00080 ru->rttVary += ((delta - ru->rttVary) >> 2);  /* h = 1/4 */
00081 ru->timeOut = rudpCalcTimeOut(ru);
00082 }
00083 
00084 static void rudpTimedOut(struct rudp *ru)
00085 /* Tell system about a time-out. */
00086 {
00087 ru->timeOut <<=  1;   /* Back off exponentially. */
00088 if (ru->timeOut >= MAX_TIME_OUT)
00089     ru->timeOut = MAX_TIME_OUT;
00090 }
00091 
00092 struct rudp *rudpNew(int socket)
00093 /* Wrap a rudp around a socket. Call rudpFree when done, or
00094  * rudpClose if you also want to close(socket). */
00095 {
00096 struct rudp *ru;
00097 assert(socket >= 0);
00098 AllocVar(ru);
00099 ru->socket = socket;
00100 ru->rttVary = 250;      /* Initial variance 250 microseconds. */
00101 ru->timeOut = rudpCalcTimeOut(ru);
00102 ru->maxRetries = 7;
00103 return ru;
00104 }
00105 
00106 void rudpFree(struct rudp **pRu)
00107 /* Free up rudp.  Note this does *not* close the associated socket. */
00108 {
00109 freez(pRu);
00110 }
00111 
00112 struct rudp *rudpOpen()
00113 /* Open up an unbound rudp.   This is suitable for
00114  * writing to and for reading responses.  However 
00115  * you'll want to rudpBind if you want to listen for
00116  * incoming messages.   Call rudpClose() when done 
00117  * with this one.  Warns and returns NULL if there is
00118  * a problem. */
00119 {
00120 int sd = socket(AF_INET,  SOCK_DGRAM, IPPROTO_UDP);
00121 if (sd < 0)
00122     {
00123     warn("Couldn't open socket in rudpOpen %s", strerror(errno));
00124     return NULL;
00125     }
00126 return rudpNew(sd);
00127 }
00128 
00129 struct rudp *rudpMustOpen()
00130 /* Open up unbound rudp.  Warn and die if there is a problem. */
00131 {
00132 struct rudp *ru = rudpOpen();
00133 if (ru == NULL)
00134     noWarnAbort();
00135 return ru;
00136 }
00137 
00138 struct rudp *rudpOpenBound(struct sockaddr_in *sai)
00139 /* Open up a rudp socket bound to a particular port and address.
00140  * Use this rather than rudpOpen if you want to wait for
00141  * messages at a specific address in a server or the like. */
00142 {
00143 struct rudp *ru = rudpOpen();
00144 if (ru != NULL)
00145     {
00146     if (bind(ru->socket, (struct sockaddr *)sai, sizeof(*sai)) < 0)
00147         {
00148         warn("Couldn't bind rudp socket: %s", strerror(errno));
00149         rudpClose(&ru);
00150         }
00151     }
00152 return ru;
00153 }
00154 
00155 struct rudp *rudpMustOpenBound(struct sockaddr_in *sai)
00156 /* Open up a rudp socket bound to a particular port and address
00157  * or die trying. */
00158 {
00159 struct rudp *ru = rudpOpenBound(sai);
00160 if (ru == NULL)
00161     noWarnAbort();
00162 return ru;
00163 }
00164 
00165 void rudpClose(struct rudp **pRu)
00166 /* Close socket and free memory. */
00167 {
00168 struct rudp *ru = *pRu;
00169 if (ru != NULL)
00170     {
00171     close(ru->socket);
00172     freez(pRu);
00173     }
00174 }
00175 
00176 static int timeDiff(struct timeval *t1, struct timeval *t2)
00177 /* Return difference in microseconds between t1 and t2.  t2 must be
00178  * later than t1 (but less than 50 minutes later). */
00179 {
00180 int secDiff = t2->tv_sec - t1->tv_sec;
00181 int microDiff = 0;
00182 if (secDiff != 0)
00183     microDiff = secDiff * 1000000;
00184 microDiff += (t2->tv_usec - t1->tv_usec);
00185 if (microDiff < 0)
00186     {
00187     /* Note, this case actually happens, currently particularly on
00188      * kkr2u62 and kkr8u19.  I think this is just a bug in their clock
00189      * hardware/software.  However in general it _could_ happen very
00190      * rarely on normal machines when the clock is reset by the
00191      * network time protocol thingie. */
00192     warn("t1 %u.%u, t2 %u.%u.  t1 > t2 but later?!", (unsigned)t1->tv_sec,
00193          (unsigned)t1->tv_usec, (unsigned)t2->tv_sec, (unsigned)t2->tv_usec);
00194     microDiff = 0;
00195     }
00196 return microDiff;
00197 }
00198 
00199 static boolean readReadyWait(int sd, int microseconds)
00200 /* Wait for descriptor to have some data to read, up to
00201  * given number of microseconds. */
00202 {
00203 struct timeval tv;
00204 fd_set set;
00205 int readyCount;
00206 
00207 for (;;)
00208     {
00209     if (microseconds > 1000000)
00210         {
00211         tv.tv_sec = microseconds/1000000;
00212         tv.tv_usec = microseconds%1000000;
00213         }
00214     else
00215         {
00216         tv.tv_sec = 0;
00217         tv.tv_usec = microseconds;
00218         }
00219     FD_ZERO(&set);
00220     FD_SET(sd, &set);
00221     readyCount = select(sd+1, &set, NULL, NULL, &tv);
00222     if (readyCount < 0) 
00223         {
00224         if (errno == EINTR)     /* Select interrupted, not timed out. */
00225             continue;
00226         else 
00227             warn("select failure in rudp: %s", strerror(errno));
00228         }
00229     else
00230         {
00231         return readyCount > 0;  /* Zero readyCount indicates time out */
00232         }
00233     }
00234 }
00235 
00236 static boolean getOurAck(struct rudp *ru, struct timeval *startTv)
00237 /* Wait for acknowledgement to the message we just sent.
00238  * The set should be zeroed out. Only wait for up to
00239  * ru->timeOut microseconds.   Prints a message and returns FALSE
00240  * if there's a problem.   */
00241 {
00242 struct rudpHeader head;
00243 int readSize;
00244 int timeOut = ru->timeOut;
00245 
00246 for (;;)
00247     {
00248     /* Set up select with our time out. */
00249     int dt;
00250     struct timeval tv;
00251 
00252     if (readReadyWait(ru->socket, timeOut))
00253         {
00254         /* Read message and if it's our ack return true.   */
00255         readSize = recvfrom(ru->socket, &head, sizeof(head), 0, NULL, NULL);
00256         if (readSize >= sizeof(head) && head.type == rudpAck && head.id == ru->lastId)
00257             {
00258             gettimeofday(&tv, NULL);
00259             dt = timeDiff(startTv, &tv);
00260             rudpAddRoundTripTime(ru, dt);
00261             return TRUE;
00262             }
00263         }
00264 
00265     /* If we got to here then we did get a message, but it's not our
00266      * ack.  We ignore the message and loop around again,  but update
00267      * our timeout so that we won't keep getting other people's messages
00268      * forever. */
00269     gettimeofday(&tv, NULL);
00270     timeOut = ru->timeOut - timeDiff(startTv, &tv);
00271     if (timeOut <= 0)
00272         return FALSE;
00273     }
00274 }
00275 
00276 int rudpSend(struct rudp *ru, struct sockaddr_in *sai, void *message, int size)
00277 /* Send message of given size to port at host via rudp.  Prints a warning and
00278  * sets errno and returns -1 if there's a problem. */
00279 {
00280 struct timeval sendTv;  /* Current time. */
00281 
00282 char outBuf[udpEthMaxSize];
00283 struct rudpHeader *head;
00284 int fullSize = size + sizeof(*head);
00285 int i, err = 0, maxRetry = ru->maxRetries;
00286 
00287 
00288 /* Make buffer with header in front of message. 
00289  * At some point we might replace this with a scatter/gather
00290  * iovector. */
00291 ru->sendCount += 1;
00292 assert(size <= rudpMaxSize);
00293 head = (struct rudpHeader *)outBuf;
00294 memcpy(head+1, message, size);
00295 head->id = ++ru->lastId;
00296 head->type = rudpData;
00297 
00298 /* Go into send/wait for ack/retry loop. */
00299 for (i=0; i<maxRetry; ++i)
00300     {
00301     gettimeofday(&sendTv, NULL);
00302     head->sendSec = sendTv.tv_sec;
00303     head->sendMicro = sendTv.tv_usec;
00304     err =  sendto(ru->socket, outBuf, fullSize, 0, 
00305         (struct sockaddr *)sai, sizeof(*sai));
00306     if (err < 0) 
00307         {
00308         /* Warn, wait, and retry. */
00309         struct timeval tv;
00310         warn(" sendto problem %s", strerror(errno));
00311         tv.tv_sec = 0;
00312         tv.tv_usec = ru->timeOut;
00313         select(0, NULL, NULL, NULL, &tv);
00314         ru->resendCount += 1;
00315         rudpTimedOut(ru);
00316         continue;
00317         }
00318     if (getOurAck(ru, &sendTv))
00319         {
00320         return 0;
00321         }
00322     rudpTimedOut(ru);
00323     ru->resendCount += 1;
00324     }
00325 if (err >= 0)
00326     {
00327     err = ETIMEDOUT;
00328     warn("rudpSend timed out");
00329     }
00330 ru->failCount += 1;
00331 return err;
00332 }
00333 
00334 
00335 int rudpReceiveTimeOut(struct rudp *ru, void *messageBuf, int bufSize, 
00336         struct sockaddr_in *retFrom, int timeOut)
00337 /* Read message into buffer of given size.  Returns actual size read on
00338  * success. On failure prints a warning, sets errno, and returns -1. 
00339  * Also returns ip address of message source. If timeOut is nonzero,
00340  * it represents the timeout in milliseconds.  It will set errno to
00341  * ETIMEDOUT in this case.*/
00342 {
00343 char inBuf[udpEthMaxSize];
00344 struct rudpHeader *head = (struct rudpHeader *)inBuf;
00345 struct rudpHeader ackHead;
00346 struct sockaddr_in sai;
00347 socklen_t saiSize = sizeof(sai);
00348 int readSize, err;
00349 assert(bufSize <= rudpMaxSize);
00350 ru->receiveCount += 1;
00351 for (;;)
00352     {
00353     if (timeOut != 0)
00354         {
00355         if (!readReadyWait(ru->socket, timeOut))
00356             {
00357             warn("rudpReceive timed out\n");
00358             errno = ETIMEDOUT;
00359             return -1;
00360             }
00361         }
00362     readSize = recvfrom(ru->socket, inBuf, sizeof(inBuf), 0, 
00363         (struct sockaddr*)&sai, &saiSize);
00364     if (retFrom != NULL)
00365         *retFrom = sai;
00366     if (readSize < 0)
00367         {
00368         if (errno == EINTR)
00369             continue;
00370         warn("recvfrom error: %s", strerror(errno));
00371         ru->failCount += 1;
00372         return readSize;
00373         }
00374     if (readSize < sizeof(*head))
00375         {
00376         warn("rudpRecieve truncated message");
00377         continue;
00378         }
00379     if (head->type != rudpData)
00380         {
00381         if (head->type != rudpAck)
00382             warn("skipping non-data message %d in rudpReceive", head->type);
00383         continue;
00384         }
00385     ackHead = *head;
00386     ackHead.type = rudpAck;
00387     err = sendto(ru->socket, &ackHead, sizeof(ackHead), 0, 
00388         (struct sockaddr *)&sai, sizeof(sai));
00389     if (err < 0)
00390         {
00391         warn("problem sending ack in rudpRecieve: %s", strerror(errno));
00392         }
00393     readSize -= sizeof(*head);
00394     if (readSize > bufSize)
00395         {
00396         warn("read more bytes than have room for in rudpReceive");
00397         readSize = bufSize;
00398         }
00399     memcpy(messageBuf, head+1, readSize);
00400     break;
00401     }
00402 return readSize;
00403 }
00404 
00405 int rudpReceiveFrom(struct rudp *ru, void *messageBuf, int bufSize, 
00406         struct sockaddr_in *retFrom)
00407 /* Read message into buffer of given size.  Returns actual size read on
00408  * success. On failure prints a warning, sets errno, and returns -1. 
00409  * Also returns ip address of message source. */
00410 {
00411 return rudpReceiveTimeOut(ru, messageBuf, bufSize, retFrom, 0);
00412 }
00413 
00414 int rudpReceive(struct rudp *ru, void *messageBuf, int bufSize)
00415 /* Read message into buffer of given size.  Returns actual size read on
00416  * success. On failure prints a warning, sets errno, and returns -1. */
00417 {
00418 return rudpReceiveFrom(ru, messageBuf, bufSize, NULL);
00419 }
00420 
00421 void rudpPrintStatus(struct rudp *ru)
00422 /* Print out rudpStatus */
00423 {
00424 printf("rudp status:\n");
00425 printf("  receiveCount %d\n", ru->receiveCount);
00426 printf("  sendCount %d\n", ru->sendCount);
00427 printf("  resendCount %d\n", ru->resendCount);
00428 printf("  failCount %d\n", ru->failCount);
00429 printf("  timeOut %d\n", ru->timeOut);
00430 printf("  rttVary %d\n", ru->rttVary);
00431 printf("  rttAve %d\n", ru->rttAve);
00432 printf("  rttLast %d\n", ru->rttLast);
00433 }
00434 
00435 void rudpTest()
00436 /* Test out rudp stuff. */
00437 {
00438 static int times[] = {1000, 200, 200, 100, 200, 200, 200, 400, 200, 200, 200, 200, 1000, 
00439         200, 200, 200, 200};
00440 struct rudp *ru = rudpNew(0);
00441 int i;
00442 
00443 for (i=0; i<ArraySize(times); ++i)
00444     {
00445     int oldTimeOut = ru->timeOut;
00446     rudpAddRoundTripTime(ru, times[i]);
00447     printf("%d\t%d\t%d\t%d\n", i, oldTimeOut, times[i], ru->timeOut);
00448     }
00449 }

Generated on Tue Dec 25 18:39:31 2007 for blat by  doxygen 1.5.2