EnsemblServer/ClientServerUtils.cpp

Go to the documentation of this file.
00001 
00002 // #######################################################################
00003 
00004 // SSAHA : Sequence Search and Alignment by Hashing Algorithm
00005 // Version 3.2, released 1st March 2004
00006 // Copyright (c) Genome Research 2002
00007 
00008 // SSAHA is free software; you can redistribute it and/or modify 
00009 // it under the terms of version 2 of the GNU General Public Licence
00010 // as published by the Free Software Foundation.
00011  
00012 // This program is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public Licence for more details.
00016  
00017 // You should have received a copy of the GNU General Public Licence
00018 // along with this program; if not, write to the Free Software
00019 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
00020 // or see the on-line version at http://www.gnu.org/copyleft/gpl.txt
00021 
00022 // #######################################################################
00023 
00024 // Module Name  : ClientServerUtils
00025 // File Name    : ClientServerUtils.cpp
00026 // Language     : C++
00027 // Module Author: Anthony J. Cox (ac2@sanger.ac.uk)
00028 
00029 // Description:
00030 
00031 // Includes:
00032 
00033 #include <cstdlib>
00034 #include "MatchStore.h"
00035 #include <cerrno>
00036 #include <sys/socket.h>
00037 #include "ClientServerUtils.h"
00038 #include <poll.h>
00039 
00040 // ### Function Definitions ###
00041 
00042 MatchInfo::MatchInfo( const Match& m ) :
00043   subjectNum(m.getSubjectNum()),  
00044   subjectStart(m.getSubjectStart()),  
00045   subjectEnd(m.getSubjectEnd()),  
00046   queryNum(m.getQueryNum()),  
00047   // changed so as to report reverse match positions as seen from forward 
00048   // direction TC 13.12.2001
00049   //queryStart(m.getQueryStart()),  
00050   //queryEnd(m.getQueryEnd()),  
00051   queryStart
00052   ( m.isQueryForward()
00053     ? m.getQueryStart()
00054     : m.getQuerySize() - m.getQueryEnd() + 1 ),
00055   queryEnd
00056   ( m.isQueryForward()
00057     ? m.getQueryEnd()
00058     : m.getQuerySize() - m.getQueryStart() + 1 ),  
00059   numBases(m.getNumBases()),
00060   isQueryForward(m.isQueryForward()) ,
00061   isSubjectForward(m.isSubjectForward()) 
00062   {}
00063 
00064 
00065 void
00066 err_sys(const char *fmt, ...)
00067 {
00068         printf("error: %s\n", fmt);
00069         exit(1);
00070 }
00071 
00072 
00073 
00074 int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr)
00075 {
00076   int             n;
00077 
00078  again:
00079   if ( (n = accept(fd, sa, salenptr)) < 0) 
00080   {
00081     if (errno == ECONNABORTED)
00082       goto again;
00083     else
00084       throw NetworkException
00085       ( (string)"accept error: " + (string)strerror(errno) ); 
00086   }
00087   return(n);
00088 } // ~Accept
00089 
00090 
00091 void
00092 Bind(int fd, const struct sockaddr *sa, socklen_t salen)
00093 {
00094   if (bind(fd, sa, salen) < 0) throw NetworkException
00095       ( (string)"bind error: " + (string)strerror(errno) ); 
00096 } // ~Bind
00097 
00098 void
00099 Listen(int fd, int backlog)
00100 {
00101   char  *ptr;
00102 
00103   /*4can override 2nd argument with environment variable */
00104   if ( (ptr = getenv("LISTENQ")) != NULL)
00105     backlog = atoi(ptr);
00106 
00107   if (listen(fd, backlog) < 0)
00108     throw NetworkException
00109       ( (string)"listen error: " + (string)strerror(errno) ); 
00110 } // ~Listen
00111 
00112 Sigfunc * signal(int signo, Sigfunc *func)
00113 {
00114   struct sigaction      act, oact;
00115   
00116   act.sa_handler = func;
00117   sigemptyset(&act.sa_mask);
00118   act.sa_flags = 0;
00119   if (signo == SIGALRM) {
00120 #ifdef  SA_INTERRUPT
00121     act.sa_flags |= SA_INTERRUPT;       /* SunOS 4.x */
00122 #endif
00123   } else {
00124 #ifdef  SA_RESTART
00125     act.sa_flags |= SA_RESTART;         /* SVR4, 44BSD */
00126 #endif
00127   }
00128   if (sigaction(signo, &act, &oact) < 0)
00129     return(SIG_ERR);
00130   return(oact.sa_handler);
00131 }
00132 /* end signal */
00133 
00134 Sigfunc * Signal(int signo, Sigfunc *func)  /* for our signal() function */
00135 {
00136   Sigfunc       *sigfunc;
00137   if ( (sigfunc = signal(signo, func)) == SIG_ERR) 
00138     throw NetworkException
00139       ( (string)"signal error: " + (string)strerror(errno) ); 
00140   return(sigfunc);
00141 }
00142 
00143 
00144 void Close(int fd)
00145 {  
00146   if (close(fd) == -1) 
00147   {
00148     throw NetworkException
00149       ( (string)"close error: " + (string)strerror(errno) ); 
00150   }
00151 }
00152 
00153 
00154 pid_t Fork(void)
00155 {
00156   pid_t pid;
00157   if ( (pid = fork()) == -1) throw NetworkException("fork error");
00158   return(pid);
00159 }
00160 
00161 int Socket(int family, int type, int protocol)
00162 {
00163   int           n;
00164   if ( (n = socket(family, type, protocol)) < 0) throw NetworkException("socket error");
00165   return(n);
00166 }
00167 
00168 
00169 ssize_t readn(int fd, void *vptr, size_t n)
00170 /* Read "n" bytes from a descriptor. */
00171 {
00172   size_t        nleft;
00173   ssize_t       nread;
00174   char  *ptr;
00175   
00176   ptr = (char*) vptr;
00177   nleft = n;
00178   while (nleft > 0) 
00179   {
00180     if ( (nread = read(fd, ptr, nleft)) < 0) 
00181     {
00182       if (errno == EINTR)
00183         nread = 0;              /* and call read() again */
00184       else
00185         return(-1);
00186     } else if (nread == 0)
00187       break;                            /* EOF */
00188     
00189     nleft -= nread;
00190     ptr   += nread;
00191   }
00192   return(n - nleft);            /* return >= 0 */
00193 }
00194 /* end readn */
00195 
00196 ssize_t Readn(int fd, void *ptr, size_t nbytes)
00197 {
00198   ssize_t               n;
00199   
00200   if ( (n = readn(fd, ptr, nbytes)) < 0)
00201     throw NetworkException("readn error");
00202   return(n);
00203 }
00204 
00205 
00206 #ifdef THIS_IS_A_NON_THREADSAFE_FUNCTION_SO_DONT_USE_IT
00207 static ssize_t my_read(int fd, char *ptr)
00208 {
00209   static int    read_cnt = 0;
00210   static char   *read_ptr;
00211   static char   read_buf[MAXLINE];
00212 
00213   cout << "@my read" << endl;
00214   
00215   if (read_cnt <= 0) 
00216   {
00217   again:
00218     if ( (read_cnt = read(fd, read_buf, sizeof(read_buf))) < 0) 
00219     {
00220       if (errno == EINTR)
00221         goto again;
00222       return(-1);
00223     } 
00224     else if (read_cnt == 0) return(0);
00225     cout << "@my read: got " << read_cnt << " bytes.\n";
00226     read_ptr = read_buf;
00227   }
00228   
00229   read_cnt--;
00230   *ptr = *read_ptr++;
00231   cout << "@my read:*ptr=" << *ptr << endl; 
00232   return(1);
00233 }
00234 
00235 ssize_t readline(int fd, void *vptr, size_t maxlen)
00236 {
00237   int           n, rc;
00238   char  c, *ptr;
00239   
00240   ptr = (char *) vptr;
00241   for (n = 1; n < maxlen; n++) {
00242     cout << "@readline, n=" << n << endl; // %%%%
00243     if ( (rc = my_read(fd, &c)) == 1) {
00244       *ptr++ = c;
00245       if ((c == '\n')||(c=='\0')) // modified to stop at end of string too
00246         break;  /* newline is stored, like fgets() */
00247     } else if (rc == 0) {
00248       if (n == 1)
00249         return(0);      /* EOF, no data read */
00250       else
00251         break;          /* EOF, some data was read */
00252     } else
00253       return(-1);               /* error, errno set by read() */
00254   }
00255   
00256   *ptr = 0;     /* null terminate like fgets() */
00257   return(n);
00258 }
00259 /* end readline */
00260 
00261 ssize_t Readline(int fd, void *ptr, size_t maxlen)
00262 {
00263   ssize_t               n;
00264   if ( (n = readline(fd, ptr, maxlen)) < 0) 
00265     throw NetworkException("readline error");
00266   return(n);
00267 }
00268 #endif
00269 
00270 ssize_t writen(int fd, const void *vptr, size_t n) 
00271 /* Write "n" bytes to a descriptor. */
00272 {
00273   size_t                nleft;
00274   ssize_t               nwritten;
00275   const char    *ptr;
00276 
00277   //  pollfd pollDetails;
00278   //  pollDetails.fd=fd;
00279   //  pollDetails.events=POLLIN|POLLRDNORM|POLLRDBAND|POLLPRI|POLLOUT|POLLWRNORM|POLLWRBAND;
00280   //  int pollOut(0);
00281 
00282 
00283   ptr = (char *) vptr;
00284   nleft = n;
00285   while (nleft > 0) {
00286 
00287     //    do { pollOut=poll(&pollDetails,1,10); } while (pollOut==0);
00288     //  cout << "POLL Writen: " <<   pollOut
00289     //   << ((pollDetails.revents|POLLIN!=0)?'1':'0')
00290     //    << ((pollDetails.revents|POLLRDNORM!=0)?'1':'0')
00291     //    << ((pollDetails.revents|POLLRDBAND!=0)?'1':'0')
00292     //   << ((pollDetails.revents|POLLPRI!=0)?'1':'0')
00293     //   << ((pollDetails.revents|POLLOUT!=0)?'1':'0')
00294     //   << ((pollDetails.revents|POLLWRNORM!=0)?'1':'0')
00295     //   << ((pollDetails.revents|POLLWRBAND!=0)?'1':'0')
00296     //   << ((pollDetails.revents|POLLERR!=0)?'1':'0')
00297     //   << ((pollDetails.revents|POLLHUP!=0)?'1':'0')
00298     //   << ((pollDetails.revents|POLLNVAL!=0)?'1':'0') << endl;
00299 
00300 
00301     if ( (nwritten = write(fd, ptr, nleft)) <= 0) {
00302       if (errno == EINTR)
00303         nwritten = 0;           /* and call write() again */
00304       else
00305         return(-1);                     /* error */
00306     }
00307     
00308     nleft -= nwritten;
00309     ptr   += nwritten;
00310   }
00311   return(n);
00312 }
00313 /* end writen */
00314 
00315 void Writen(int fd, void *ptr, size_t nbytes)
00316 {
00317 
00318 
00319   if (writen(fd, ptr, nbytes) != nbytes) 
00320   {
00321     //    cout << "Error no :"<< errno << endl;
00322     throw BrokenSocketException();
00323     //    else throw NetworkException("writen error");
00324   }
00325 }
00326 
00327 
00328 char* Fgets(char *ptr, int n, FILE *stream)
00329 {
00330   char  *rptr;
00331   if ( (rptr = fgets(ptr, n, stream)) == NULL && ferror(stream))
00332     throw NetworkException("fgets error");
00333   return (rptr);
00334 }
00335 
00336 void Fputs(const char *ptr, FILE *stream)
00337 {
00338   if (fputs(ptr, stream) == EOF) throw NetworkException("fputs error");
00339 }
00340 
00341 void Connect(int fd, const struct sockaddr *sa, socklen_t salen)
00342 {
00343   if (connect(fd, sa, salen) < 0) throw NetworkException("connect error");
00344 }
00345 
00346 
00347 
00348 // SocketInterface member functions
00349 
00350 void SocketInterface::sendSequence( const WordSequence& seq )
00351 {
00352     SequenceHeader sinfo;
00353     sinfo.size = seq.end() - seq.begin();
00354     sinfo.basesInLast = seq.getNumBasesInLast();
00355 
00356     sendStruct(&sinfo);
00357     Writen(portNum_, (void*)&(*seq.begin()), sinfo.size*sizeof(Word));
00358 
00359     //    bytesSent_+=sinfo.size*sizeof(Word)+sizeof(sinfo);
00360     //   cout << "@sent seq " << sinfo.size*sizeof(Word)+sizeof(sinfo) 
00361     //   << " bytes, " << bytesSent_ << "total.\n";
00362 
00363 }
00364 
00365 
00366 void SocketInterface::receiveSequence( WordSequence& seq )
00367 {
00368   SequenceHeader sinfo;
00369   receiveStruct(&sinfo);
00370   //  cout << "receiveSeq: " << sinfo.size << " " << sinfo.basesInLast << endl;
00371   seq.resize(sinfo.size);
00372 
00373   copy((char*)&(*seq.begin()), sinfo.size*sizeof(Word));
00374   seq.setNumBasesInLast(sinfo.basesInLast);
00375 
00376   bytesRead_+=sinfo.size*sizeof(Word)+sizeof(sinfo);
00377   //  cout << "@read seq " << sinfo.size*sizeof(Word)+sizeof(sinfo) 
00378   //       << " bytes, " << bytesRead_ << "total.\n";
00379 }
00380 
00381 void SocketInterface::sendString( const string& s )
00382 {
00383   Writen(portNum_, (void*)(s.c_str()), s.size()+1);
00384   bytesSent_+=s.size()+1;
00385   //  cout << "@send string " << s.size()+1 << " bytes, " << bytesSent_
00386   //     << "total.\n";
00387   // +1 cos we need to send the \0 at the end of the string
00388 }
00389 
00390 void SocketInterface::sendChars( const char* pChar, int numChars )
00391 {
00392   static const char c('\0');
00393   Writen(portNum_, (void*)pChar, numChars);
00394   Writen(portNum_, (void*)&c, 1);
00395   bytesSent_+=numChars+1;
00396 }
00397 
00398 
00399 void SocketInterface::receiveString( string& s )
00400 {
00401   s="";
00402   char c;
00403   while(1)
00404   {
00405     receiveStruct(&c);
00406     if((c=='\0')||(c=='\n')) return;
00407     s+=c;
00408   } 
00409 }
00410 
00411 void SocketInterface::receiveChars( vector<char>& v )
00412 {
00413   v.clear();
00414   char c;
00415   while(1)
00416   {
00417     receiveStruct(&c);
00418     if((c=='\0')||(c=='\n')) return;
00419     v.push_back(c);
00420   } 
00421 }
00422 
00423 
00424 //string SocketInterface::receiveString( void )
00425 //{
00426 //  int a(Readline(portNum_, buffer_, MAXLINE));
00427 //  bytesRead_+=a;
00428 //  cout << "@receive string " << a << " bytes, " << bytesRead_
00429 //       << "total.\n";
00430   // == 0)
00431   //     throw NetworkException("Data stream from server ended prematurely!");
00432   //  if (Readline(portNum_, buffer_, MAXLINE) == 0)
00433   //     throw NetworkException("Data stream from server ended prematurely!");
00434 //  return string(buffer_);
00435 //}
00436 
00437 void SocketInterface::checkSocketEmpty( void )
00438 {
00439   
00440   //  Linux and other Unixes doesn't have MSG_NONBLOCK as an argument to recv. Workaround by DJC.
00441   
00442 
00443   //  Is the socket blocking at the moment?
00444   bool blocking;
00445   
00446   int flags=fcntl(portNum_,F_GETFL,0);
00447   int newflags=flags;
00448 
00449   if (flags==-1) 
00450     throw NetworkException("Problem reading status (fcntl) of socket");
00451   
00452 
00453   // Make it non-blocking
00454   newflags|=O_NONBLOCK;
00455   fcntl(portNum_,F_SETFL,newflags);
00456 
00457   // See if we can read data
00458   int n(recv( portNum_, buffer_, MAXLINE, MSG_PEEK));
00459  
00460   // Put the socket back to how it was before (blocking or non-blocking)
00461   fcntl(portNum_,F_SETFL,flags); 
00462 
00463   //perror("error status = ");
00464   if (errno!=EWOULDBLOCK) 
00465     {
00466       //    cout << "There are " << n << " bytes waiting to be read.\n";
00467       if (n>0) throw NetworkException("Unexpected data at socket!");
00468     }
00469 
00470 }
00471 
00472 void SocketInterface::copy( char* target, int numBytes)
00473 {
00474   //  cout << "copy - requested " << numBytes << ", got " << size() << endl;
00475   if (size()<numBytes) getAtLeast(numBytes-size());
00476 
00477   for( int i(0) ; i < numBytes ; ++ i )
00478     { *target++ = front(); pop_front(); }
00479 
00480   //  cout << "final size " << size() << endl;
00481 }
00482 
00483 void SocketInterface::getAtLeast( int numBytes )
00484 {
00485 
00486   //  cout << "  getAtLeast " << numBytes << " bytes\n";
00487 
00488   int numRead(0);
00489   int totalNumRead(0); 
00490   int numTries(0);
00491   fd_set fds;
00492   FD_ZERO(&fds);
00493   FD_SET(portNum_, &fds);
00494   
00495   while(1)
00496   {
00497     // bit of a cheat using numRead twice here ...
00498     numRead=select(portNum_+1, &fds, NULL, NULL, &timeOut_ );
00499     if (numRead==0)
00500     {
00501       throw NetworkException
00502         ("Timed out getting data from socket: " + (string)strerror(errno) ); 
00503     } // ~if
00504     else if (numRead<0)
00505     {
00506       throw NetworkException
00507         ("Select error getting data from socket: " + (string)strerror(errno) ); 
00508     } // ~else if
00509 
00510     //    numRead=read(portNum_,(void*)&buffer_[0],MaxChunkSize);
00511     numRead=recv(portNum_,(void*)&buffer_[0],MaxChunkSize,0);
00512     if (numRead==0)
00513     {
00514       if (++numTries==100)
00515       throw NetworkException
00516         ("Reading zero bytes from socket, probable peer disconnect: " 
00517          + (string)strerror(errno) ); 
00518     }    
00519     if ( numRead < 0 )
00520     {
00521       if (errno==EINTR) continue;
00522       else 
00523       {
00524         //      throw NetworkException("getAtLeast error");
00525         throw NetworkException
00526         ( (string)"getAtLeast error: " + (string)strerror(errno) ); 
00527       } // ~else
00528     } // ~if
00529     //    cout << " got " << numRead << " so far...\n";
00530     insert(end(),&buffer_[0],&buffer_[numRead]);
00531     totalNumRead +=numRead;
00532     if (totalNumRead>=numBytes) break;
00533 
00534   } // ~while
00535 
00536   //  cout << "  total got " << totalNumRead << endl;
00537 
00538 }
00539 
00540 
00541 
00542 
00543 // Name:
00544 // Arguments:
00545 // TYPE  NAME  IN/OUT COMMENT
00546 // Returns: TYPE COMMENT
00547 
00548 // End of file ClientServerUtils.cpp
00549 
00550 

Generated on Fri Dec 21 13:12:15 2007 for ssaha by  doxygen 1.5.2