EnsemblServer/SSAHAServer.cpp

Go to the documentation of this file.
00001 
00002 // #######################################################################
00003 
00004 // SSAHA : Sequence Search and Alignment by Hashing Algorithm
00005 // Version 3.2, released 1st March 2004
00006 // Copyright (c) Genome Research 2002
00007 
00008 // SSAHA is free software; you can redistribute it and/or modify 
00009 // it under the terms of version 2 of the GNU General Public Licence
00010 // as published by the Free Software Foundation.
00011  
00012 // This program is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public Licence for more details.
00016  
00017 // You should have received a copy of the GNU General Public Licence
00018 // along with this program; if not, write to the Free Software
00019 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
00020 // or see the on-line version at http://www.gnu.org/copyleft/gpl.txt
00021 
00022 // #######################################################################
00023 
00024 // Module Name  : SSAHAServer
00025 // File Name    : SSAHAServer.cpp
00026 // Language     : C++
00027 // Module Author: Anthony J. Cox (ac2@sanger.ac.uk)
00028 
00029 // Description:
00030 
00031 // Includes:
00032 
00033 #include "ClientServerUtils.h"
00034 #include "SSAHAServer.h"
00035 #include "HashTableGeneric.h"
00036 #include "SequenceReaderLocal.h"
00037 #include "QueryManager.h"
00038 #include "MatchStoreUngapped.h"
00039 #include <string.h>
00040 #include <string>
00041 #include <iomanip>
00042 #include <sys/resource.h>
00043 #include <cmath>
00044 
00045 // ### Function Definitions ###
00046 
00047 static Handshake hello;
00048 static pid_t myPID;
00049 static HashTableGeneric* pHashTable = NULL; 
00050 static SourceReaderIndex* pSourceReader = NULL;
00051 static double expectedNumHits;
00052 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
00053 // static pthread_rwlock_t rwlock;
00054 
00055 typedef MatchTaskSort
00056 <SortByMatch>
00057 SortByMatchType;
00058 
00059 //typedef MatchTaskSort
00060 //<CombineSort<SortByMatchSize,SortByDiff,SortByQueryStart> >
00061 //SortByMatchType;
00062 
00063 typedef MatchTaskSort
00064 <CombineSort<SortByPercentMatch,SortByDiff,SortByQueryStart> >
00065 SortByPercentType;
00066 
00067 enum ServerModeType { eFork, eThreads, eSingle };
00068 enum { tooLargeNameFileSize=100000000 }; // TBD change to 100 million
00069 
00070 void resourceUsage( void )
00071 {
00072   static struct rusage usage1, usage2;
00073   static rusage* pUsage(&usage1);
00074 
00075   getrusage (RUSAGE_SELF, pUsage);
00076 
00077 
00078   //  printf("%d: %d.%03du %d.%03ds %d+%dk %dpf %dsw\n",
00079   //     myPID,
00080   // usage2.ru_utime.tv_sec,
00081   // usage2.ru_utime.tv_usec/1000,
00082   // usage2.ru_stime.tv_sec,
00083   // usage2.ru_stime.tv_usec/1000,
00084   // usage2.ru_ixrss,
00085   // usage2.ru_idrss + usage2.ru_isrss,
00086   // usage2.ru_majflt,
00087   // usage2.ru_nswap);
00088 
00089   if (pUsage==&usage2)
00090   {
00091     cout << myPID << ": CPU microseconds used: \t"
00092          <<   ((usage2.ru_utime.tv_sec * 1000000) + usage2.ru_utime.tv_usec)
00093       - ((usage1.ru_utime.tv_sec * 1000000) + usage1.ru_utime.tv_usec)
00094          << " (user) \t"
00095          <<   ((usage2.ru_stime.tv_sec * 1000000) + usage2.ru_stime.tv_usec)
00096       - ((usage1.ru_stime.tv_sec * 1000000) + usage1.ru_stime.tv_usec)
00097          << " (system)" << endl;
00098     pUsage=&usage1;
00099     
00100   }
00101  else pUsage=&usage2;
00102 
00103 
00104 } // ~void resourceUsage( void )
00105 
00106 
00107 void sig_chld( int signo )
00108 {
00109   pid_t pid;
00110   int           stat;
00111   
00112   while ( (pid = waitpid(-1, &stat, WNOHANG)) > 0)
00113     cout << myPID << ": terminated child " << pid << endl;
00114     //    printf("child %d terminated\n", pid);
00115   return;
00116 }
00117 
00118 void sig_pipe( int signo )
00119 {
00120   cout << myPID << ": broken connection" << endl;
00121   //  throw BrokenSocketException();
00122 }
00123 
00124 
00125 
00126 
00127 
00128 class MatchTaskServer : public MatchTask
00129 {
00130 public:
00131   MatchTaskServer( void ) : sendSequence_(false) {} // SocketInterface&
00132   virtual void operator()(MatchStore& store);
00133   void sendMatches( SocketInterface& socket );
00134   void sendSequence( bool yesOrNo=true ) { sendSequence_ = yesOrNo; }
00135 private:
00136   vector<MatchInfo> matches_;
00137   map<SequenceNumber,std::string> names_;
00138   bool sendSequence_;
00139 };
00140 
00141 void MatchTaskServer::operator()(MatchStore& store)
00142 {
00143 
00144  for (MatchStore::iterator i( store.begin() ) ; i!=store.end() ; ++i )
00145  {
00146     matches_.push_back( MatchInfo( **i ) );
00147     names_.insert
00148     ( pair<SequenceNumber, string>
00149       ( (*i)->getSubjectNum(), (*i)->getSubjectName() ) );
00150  }
00151 }
00152 
00153 void MatchTaskServer::sendMatches( SocketInterface& socket )
00154 {
00155 
00156   MatchHeader header;
00157   header.wasSuccessful=true; // TBD enum with more descriptive stuff
00158   header.numSubjectNames=names_.size();
00159   header.numMatches=matches_.size();
00160   cout << myPID << ": sending " << header.numMatches << " matches among "
00161        << header.numSubjectNames << " subject sequences." << endl; 
00162   
00163   socket.sendStruct(&header);
00164 
00165   char endString('\0');
00166   char* pSource;
00167 
00168   // Send the subject nums and names of all database sequences
00169   // which matched the query
00170   for ( map<SequenceNumber,std::string>::iterator i(names_.begin()); i != names_.end() ; i++  ) {
00171     cout << myPID << ": " << i->first << " " << i->second << endl;
00172     socket.sendStruct(&i->first);
00173     socket.sendString(i->second);
00174   } 
00175 
00176   // Send the match info itself
00177   for ( vector<MatchInfo>::iterator i( matches_.begin()); i != matches_.end() ; ++i ) {  
00178     
00179     socket.sendStruct((MatchInfo*)&(*i));
00180     
00181     if (sendSequence_==true) {
00182       //      cout << myPID << ": getting ready to send sequence " 
00183       //   << i->subjectNum << " " 
00184       //   << i->subjectStart << " " 
00185       //   << i->subjectEnd << endl;
00186       if(pSourceReader!=NULL) {
00187         pSourceReader->extractSource ( &pSource, i->subjectNum, i->subjectStart, i->subjectEnd );
00188         socket.sendChars( pSource, i->subjectEnd-i->subjectStart+1 );
00189         //cout << myPID << ": sent " 
00190         //   << i->subjectEnd-i->subjectStart+1 << " chars, "
00191         //   << pSource[0]
00192         //   << pSource[i->subjectEnd-i->subjectStart] << endl;
00193       } else {
00194         // send a null string if no index info is available
00195         socket.sendStruct(&endString);
00196         //      cout << myPID << ": sent null string" << endl; 
00197       }
00198     
00199     } 
00200   
00201   } 
00202 
00203 }
00204 
00205 
00206 void processQuery(int sockfd, HashTableGeneric& hashTable, const string& myPID)
00207 {
00208 
00209   SocketInterface socket(sockfd,60);
00210   socket.sendStruct(&hello);
00211 
00212   //  cout << myPID << ": sent handshake " << sizeof(hello) << endl;
00213  
00214   QueryHeader qinfo;
00215   socket.receiveStruct(&qinfo);
00216 
00217   //  cout << myPID << ": received query header" << endl;
00218 
00219   cout << myPID << ": Client request: " << qinfo << endl;
00220 
00221   MatchHeader rejectQuery;
00222   rejectQuery.wasSuccessful=false; // TBD enum with more descriptive stuff
00223   rejectQuery.numSubjectNames=0;
00224   rejectQuery.numMatches=0;
00225 
00226   if( qinfo.numQueryWords>hello.maxBufferSize )
00227   {
00228     cout << myPID << ": Rejecting client request (too large)." << endl;
00229     socket.sendStruct(&rejectQuery);
00230     return;
00231   }
00232 
00233   int incomingWordLength;
00234   
00235   if( qinfo.bitsPerSymbol == hashTable.getBitsPerSymbol() )
00236   {
00237     incomingWordLength = hashTable.getWordLength();
00238   } 
00239   else if (    (qinfo.bitsPerSymbol==gBaseBits)
00240             && (hashTable.getBitsPerSymbol()==gResidueBits) )
00241   {
00242     incomingWordLength = gMaxBasesPerWord;
00243   } // ~else if
00244   else
00245   {
00246     cout << myPID << ": Rejecting client request (can't accept queries with "
00247          << qinfo.bitsPerSymbol << " bits per symbol)." << endl;
00248     socket.sendStruct(&rejectQuery);
00249     return;
00250   } // ~if
00251 
00252   cout << myPID << ": Expecting data with " << qinfo.bitsPerSymbol 
00253      << " bits per symbol, " << incomingWordLength 
00254      << " symbols per word." << endl;
00255 
00256   SequenceHeader sinfo;
00257 
00258   SequenceReaderLocal querySeqs
00259     ( incomingWordLength, qinfo.bitsPerSymbol, cerr );
00260 
00261   for ( int i(0) ; i < qinfo.numQuerySeqs ; i++ )
00262   {
00263     querySeqs.push_back();
00264     socket.receiveSequence(querySeqs.back().first);
00265     // TBD check we're not exceeding our limit
00266     //  cout << printWord(query.back(), hello.wordLength) << endl;
00267   }
00268 
00269 
00270   socket.checkSocketEmpty();
00271   //  cout << myPID << querySeqs.getNumSequencesInFile() << endl;
00272 
00273   // Now do the match
00274   
00275   // TBD next line is not thread safe!!! - move inside mutex!
00276   //  hashTable.setMaxNumHits
00277   //  (
00278   //   (qinfo.clipThreshold==0) 
00279   //   ? (1<<30) 
00280   //   : 1+(int)(qinfo.clipThreshold * expectedNumHits)        
00281   //  );
00282 
00283   QueryManager queryManager( querySeqs,hashTable,cerr);
00284   //  MatchTaskPrint print(cout);
00285 
00286   MatchAlgorithm* pMatcher(NULL);
00287 
00288    if ( (qinfo.maxGap!=0)||(qinfo.maxInsert!=0) )
00289   {
00290     pMatcher = new MatchAlgorithmGapped
00291       ( qinfo.maxGap, qinfo.maxInsert, qinfo.minPrint, qinfo.numRepeats );
00292   } // ~if
00293   else
00294   {
00295     //    pMatcher = new MatchAlgorithmUngapped
00296     //   ( qinfo.minPrint, qinfo.numRepeats );
00297     pMatcher = new MatchAlgorithmGapped
00298       ( 0, 0, qinfo.minPrint, qinfo.numRepeats );
00299   } // ~else
00300 
00301   MatchTask* pTask(NULL);
00302 
00303   MatchTaskServer sendToClient;
00304 
00305   SortByMatchType sortByMatch(qinfo.maxMatches,0.25);
00306   SortByPercentType sortByPercent(qinfo.maxMatches,0.25);
00307 
00308   if (    (qinfo.maxMatches==0)
00309        || (    (qinfo.sortMode!=eSortByMatchLength)
00310             && (qinfo.sortMode!=eSortByPercentMatch)
00311             && (qinfo.sortMode!=eSortAndReturnSequence)  )  )
00312   {
00313     pTask = &sendToClient;
00314   }
00315   else if (qinfo.sortMode==eSortByMatchLength)
00316   {
00317     pTask=new CombineTask<SortByMatchType,MatchTaskServer>
00318       ( sortByMatch,sendToClient );
00319   }
00320   else if (qinfo.sortMode==eSortByPercentMatch)
00321   {
00322     pTask=new CombineTask<SortByPercentType,MatchTaskServer>
00323       ( sortByPercent,sendToClient );
00324   }
00325   else if (qinfo.sortMode==eSortAndReturnSequence)
00326   {
00327     pTask=new CombineTask<SortByMatchType,MatchTaskServer>
00328       ( sortByMatch,sendToClient );
00329     sendToClient.sendSequence(true);
00330   }
00331 
00332   assert(pTask!=NULL);
00333 
00334 
00335   //  CombineTask<MatchTaskSort<SortByMatchSize>,MatchTaskServer>
00336   //    task( sort, sendToClient );
00337   //  MatchTaskServer task;
00338 
00339 
00340   //  cout << myPID << ": awaiting mutex lock" << endl;
00341 
00342   if (pthread_mutex_lock(&mutex)!=0)
00343     throw SSAHAException("Couldn't lock mutex!!");
00344 
00345   //  if (pthread_rwlock_rdlock(&rwlock)!=0)
00346   //   throw SSAHAException("Couldn't acquire read write lock!!");
00347 
00348   //  cout << myPID << ": got mutex lock" << endl;
00349 
00350   hashTable.setMaxNumHits
00351   (
00352    (qinfo.clipThreshold==0) 
00353    ? (1<<30) 
00354    : 1+(int)(qinfo.clipThreshold * expectedNumHits)            
00355   );
00356 
00357   hashTable.setSubstituteThreshold
00358   (
00359    (qinfo.substituteThreshold==0)
00360    ? 0
00361    : 1+(int)(qinfo.substituteThreshold * expectedNumHits)
00362   );
00363 
00364 
00365 
00366   queryManager.doQuery( *pMatcher, *pTask ); 
00367 
00368   if (pthread_mutex_unlock(&mutex)!=0)
00369     throw SSAHAException("Couldn't unlock mutex!!");
00370 
00371   //  if (pthread_rwlock_unlock(&rwlock)!=0)
00372   //   throw SSAHAException("Couldn't drop read write lock!!");
00373 
00374   //  cout << myPID << ": relinquished mutex lock" << endl;
00375 
00376   sendToClient.sendMatches(socket);
00377 
00378   if (pTask!=&sendToClient) delete pTask;
00379 
00380   delete pMatcher;
00381 
00382 }
00383 
00384 HashTableGeneric* generateHashTable
00385 (const string& currentHashTableName, string& controlFileName )
00386 {
00387 
00388   cout << myPID << ": attempting to read in hash table " 
00389        << currentHashTableName << endl;
00390 
00391    // create new source reader from index files
00392    {
00393      delete pSourceReader;
00394 
00395      ifstream filesFile( (currentHashTableName+(string)".files").c_str() );
00396      ifstream indexFile( (currentHashTableName+(string)".index").c_str() );
00397      if ((!filesFile.fail())&&(!indexFile.fail()))
00398      {
00399        pSourceReader = new SourceReaderIndex(currentHashTableName);
00400        cout << myPID 
00401             << ": found a sequence source index for this hash table."
00402             << endl;
00403      } // ~if  
00404      else
00405      {
00406        pSourceReader = NULL;
00407        cout << myPID 
00408             << ": did not find a sequence source index for this hash table."
00409             << endl;
00410      } // ~else
00411      filesFile.close();
00412      indexFile.close();
00413    } // ~scope of ifstreams
00414 
00415    HashTableFactory creator(cerr);
00416 
00417    HashTableGeneric* 
00418      pHashTable; // overrides global definition, bit naughty
00419 
00420    if (pSourceReader==NULL)
00421    {
00422      pHashTable = creator.loadHashTable(currentHashTableName); 
00423      // will throw if not successful
00424    } // ~if
00425    else
00426    {
00427      ifstream nameFile( (currentHashTableName+(string)".name").c_str() );
00428      nameFile.seekg(0, ios::end);
00429      if (nameFile.tellg()>tooLargeNameFileSize) // TBD change to 100 million
00430      {
00431        cout << myPID 
00432             << ": sequence name file exceeds " << tooLargeNameFileSize
00433             << " bytes, will not be stored locally"
00434             << endl;
00435        pHashTable = creator.loadHashTable(currentHashTableName, pSourceReader); 
00436      } // ~if
00437      else
00438      {
00439        pHashTable = creator.loadHashTable(currentHashTableName); 
00440      } // ~else
00441    } // ~if
00442 
00443 
00444 
00445 
00446    pHashTable->setMaxNumHits(100000000);
00447 
00448    hello.wordLength = pHashTable->getWordLength();
00449    
00450    if ( pHashTable->getBitsPerSymbol() == gResidueBits )
00451    {
00452      hello.tableType = 
00453        ( ( pHashTable->getSourceDataType() == gDNAData )
00454          ? e5bitTranslatedDNA
00455          : e5bitProtein );
00456    } // ~if
00457    else hello.tableType = e2bitDNA;
00458      
00459    // hello.bitsPerSymbol = pHashTable->getBitsPerSymbol();
00460 
00461    cout << myPID << ": Hash table built using " 
00462         << ((hello.tableType==e2bitDNA)?gBaseBits:gResidueBits)
00463         << " bits per symbol, " << hello.wordLength 
00464         << " symbols per word." << endl;
00465 
00466    cout << myPID << ": Hits are stored using " << 
00467      ((pHashTable->getHitListFormat()==gStandard)
00468       ?(string)"standard (64 bit per hit)"
00469       :(string)"compressed (32 bit per hit)")
00470         << " format." << endl;
00471 
00472    expectedNumHits = pHashTable->getTotalNumWords();
00473 
00474    if (    (pHashTable->getHitListFormat()==gTranslated)
00475            || (pHashTable->getHitListFormat()==g32BitPackedProtein) )
00476    {
00477      expectedNumHits 
00478        /= pow((double)gNumCodonEncodings,(int)pHashTable->getWordLength());
00479    } // ~if
00480    else
00481    {
00482      assert(pHashTable->getBitsPerSymbol()==gBaseBits);
00483      expectedNumHits /= 1<<(2*pHashTable->getWordLength());
00484    } // ~else
00485 
00486 
00487    //   expectedNumHits /= (hello.bitsPerSymbol==gBaseBits)
00488    // ? 1<<(2*hello.wordLength)
00489    // : pow(gNumCodonEncodings,hello.wordLength);
00490 
00491    cout << setprecision(8) << setiosflags(ios::fixed);
00492 
00493    cout << myPID << ": would expect " << expectedNumHits 
00494          << " hits per word for a random database of this size." << endl;
00495 
00496    if (!controlFileName.empty())
00497    {
00498      cout << myPID << ": writing current hash table name to control file "
00499            << controlFileName << endl;
00500      ofstream controlFile(controlFileName.c_str());
00501      controlFile << currentHashTableName;
00502      if (controlFile.fail())
00503      {
00504        cout << myPID 
00505             << ": problem writing to control file, reverting to default mode."
00506             << endl;
00507        controlFileName="";
00508      } // ~if
00509      controlFile.close();
00510    } // ~if
00511 
00512    return pHashTable;
00513 
00514 } // ~generateHashTable
00515 
00516 // Wrappers for thread functions
00517 
00518 void Pthread_create
00519 (pthread_t *tid, const pthread_attr_t *attr, void * (*func)(void *), void *arg)
00520 {
00521   int n;
00522   if ( (n = pthread_create(tid, attr, func, arg)) == 0)
00523     return;
00524   errno = n;
00525   throw NetworkException("pthread_create error");
00526 }
00527 
00528 void Pthread_join(pthread_t tid, void **status)
00529 {
00530   int           n;
00531 
00532   if ( (n = pthread_join(tid, status)) == 0)
00533     return;
00534   errno = n;
00535   throw NetworkException("pthread_join error");
00536 }
00537 
00538 void Pthread_detach(pthread_t tid)
00539 {
00540   int           n;
00541   
00542   if ( (n = pthread_detach(tid)) == 0)
00543     return;
00544   errno = n;
00545   throw NetworkException("pthread_detach error");
00546 }
00547 
00548 void Pthread_kill(pthread_t tid, int signo)
00549 {
00550   int           n;
00551   
00552   if ( (n = pthread_kill(tid, signo)) == 0)
00553     return;
00554   errno = n;
00555   throw NetworkException("pthread_kill error");
00556 }
00557 
00558 
00559 void* threadWrapper( void* arg )
00560 {
00561   int connfd;
00562   connfd = *((int*)arg); 
00563   delete (int*) arg;
00564   pthread_t myThreadNum(pthread_self());
00565   Pthread_detach(myThreadNum);
00566 
00567   char buf[100];
00568   sprintf(buf,"%d/%p",myPID,myThreadNum);
00569   string myPIDstring(buf);
00570 
00571   //  cout << "Connection ID: " << connfd << endl;
00572   try
00573   {
00574     processQuery(connfd, *pHashTable, myPIDstring);
00575   }
00576   catch (const BrokenSocketException& err )
00577   {
00578     cout 
00579       << myPIDstring 
00580       << ": Caught BrokenSocketException in thread, returning without closing"
00581       << endl;
00582     return NULL;
00583   }
00584   catch (const NetworkException& err )
00585   {
00586     cout << myPIDstring << ": Caught NetworkException in thread: " 
00587          << err.what() << "\n";
00588   }
00589   catch (const SSAHAException& err )
00590   {
00591     cout << myPIDstring << ": Caught SSAHA exception in thread: " 
00592          << err.what() << "\n";
00593   }
00594   catch (const std::exception& err )
00595   {
00596     cout << myPIDstring << ": Caught exception in thread: " 
00597          << err.what() << "\n";
00598   }
00599 
00600   Close(connfd);
00601   cout << myPIDstring << ": Exiting thread" << endl;;
00602   return NULL;
00603 } // ~threadWrapper
00604 
00605 
00606 
00607 int main( int numArgs, char* args[] )
00608 {
00609 
00610   
00611 
00612   try
00613   {
00614 
00615     string controlFileName("");
00616     //    bool   doFork(true);
00617     ServerModeType serverMode(eFork);
00618     string modeString("fork");
00619 
00620     ofstream* pFile(NULL);
00621 
00622     myPID = getpid();
00623     char buf[100];
00624     sprintf(buf,"%d",myPID);
00625     string myPIDstring(buf);
00626     
00627     hello.ssahaversion = SERV_VERSION;
00628     
00629     // next bit only used in thread mode
00630     pthread_attr_t threadAttribs;
00631     size_t threadStackSize;
00632      
00633     if ( (numArgs>=4) && (numArgs<=7) )
00634     {
00635       if (numArgs==7)
00636       {
00637         modeString = args[6];
00638         if (modeString=="single")
00639         {
00640           serverMode = eSingle;
00641         }
00642         else if (modeString=="threads")
00643         {
00644           serverMode = eThreads;
00645           if ( pthread_attr_init(&threadAttribs) != 0 )
00646             throw SSAHAException("Problem setting thread attributes!");
00647           
00648           //      if ( pthread_attr_getstacksize
00649           //      (&threadAttribs, &threadStackSize) != 0 )
00650           //   throw SSAHAException("Problem getting stacksize!");
00651           
00652           //      cout << "got stack size of " << threadStackSize << endl;
00653 
00654           //      threadStackSize *= 4;
00655 
00656           //      if ( pthread_attr_setstacksize
00657           //           (&threadAttribs, threadStackSize) != 0 )
00658           //   throw SSAHAException("Problem setting stacksize!");
00659           
00660           //      if ( pthread_attr_getstacksize
00661           //       (&threadAttribs, &threadStackSize) != 0 )
00662           //    throw SSAHAException("Problem getting stacksize!");
00663           
00664           //      cout << "got stack size of " << threadStackSize << endl;
00665 
00666           //      if ( pthread_rwlock_init( &rwlock, NULL ) != 0 )
00667           //    throw SSAHAException("Could not initialize read/write lock!");
00668 
00669 
00670         }
00671         else if (modeString!="fork")
00672         {
00673           cout << "syntax: " << args[0] 
00674                << " hashTableFileName portNumber maxQueryWordsAllowed"
00675                << " [logFileName|noLogFile]"
00676                << " [controlFileName|noControlFile]" 
00677                << " [fork|threads|single]" << endl;
00678           throw SSAHAException("Invalid command line input to server");
00679         } // ~else
00680       }
00681       if (numArgs>=6)
00682       {
00683         controlFileName = std::string(args[5]);
00684         if (controlFileName=="noControlFile") controlFileName="";
00685       }
00686       if ((numArgs>=5)&&((string)args[4]!="noLogFile"))
00687       {
00688         pFile= new ofstream(args[4]);
00689         cout.rdbuf(pFile->rdbuf());
00690       }
00691     } // ~if
00692     else
00693     {
00694       cout << "syntax: " << args[0] 
00695                << " hashTableFileName portNumber maxQueryWordsAllowed"
00696                << " [logFileName|noLogFile]"
00697                << " [controlFileName|noControlFile]" 
00698                << " [fork|threads|single]" << endl;
00699       throw SSAHAException("Invalid command line input to server");
00700     }
00701 
00702     // next 2 lines `switch off' standard error
00703     NullBuffer db;
00704     cerr.rdbuf(&db);
00705 
00706     cout << myPID << ": Master process started " << getTimeNow();
00707 
00708     string currentHashTableName(args[1]);
00709     //    hashTable.loadHashTable((string)args[1]); 
00710 
00711     cout << myPID << ": Server will operate in mode " << modeString << endl;
00712          
00713 
00714 
00715     int portNumber(atoi(args[2]));
00716 
00717     cout << myPID << ": Queries will be accepted via port number " 
00718          << portNumber << "." << endl;
00719 
00720     hello.maxBufferSize = atoi(args[3]);
00721 
00722     cout << myPID << ": Any queries containing more than " 
00723          << hello.maxBufferSize << " total words will be rejected." << endl;
00724 
00725     pHashTable 
00726       = generateHashTable(currentHashTableName, controlFileName);
00727     // generateHashTable now also populates pSourceReader TC 24.1.2
00728 
00729     
00730     //    int loopNum(0), checkControlInterval(10);
00731 
00732     int       listenfd, connfd;
00733     pid_t     childpid;
00734     socklen_t clilen;
00735     struct sockaddr_in  cliaddr, servaddr;
00736     void      sig_chld(int);
00737     void      sig_pipe(int);
00738 
00739     // next 2 vars only used in thread mode
00740     int* iptr;
00741     pthread_t tid;
00742 
00743     listenfd = Socket(AF_INET, SOCK_STREAM, 0);
00744     
00745     //  bzero(&servaddr, sizeof(servaddr));
00746     memset((void*)&servaddr, 0, sizeof(servaddr));
00747     
00748     servaddr.sin_family      = AF_INET;
00749     servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
00750     servaddr.sin_port        = htons(portNumber);
00751     
00752     Bind(listenfd, (SA *) &servaddr, sizeof(servaddr));
00753     
00754     Listen(listenfd, LISTENQ);
00755     
00756     Signal(SIGCHLD, sig_chld); // must call waitpid() 
00757     Signal(SIGPIPE, sig_pipe); // must call waitpid() 
00758     
00759     for ( ; ; ) 
00760     {
00761 
00762       clilen = sizeof(cliaddr);
00763       if ((connfd = accept(listenfd, (SA *) &cliaddr, &clilen))< 0)
00764       {
00765         if (errno == EINTR)
00766           continue; // back to for() 
00767         else
00768           err_sys("accept error");
00769       }
00770           
00771 
00772       if (!controlFileName.empty())
00773       {
00774           ifstream controlFile( controlFileName.c_str() );
00775           string newHashTableName("");
00776 
00777           if (controlFile.fail())
00778           {
00779             cout << myPID 
00780                  << ": could not read control file,"
00781                  << " assume reread of existing table is required"
00782                  << endl; 
00783             newHashTableName = currentHashTableName;
00784           } // ~if
00785           else
00786           {
00787             controlFile >> newHashTableName;
00788             controlFile.close();
00789             //      cout << myPID << ": " 
00790             // << currentHashTableName << " " 
00791             // << currentHashTableName.size() << " " 
00792             // << newHashTableName << " " 
00793             // << newHashTableName.size() << " " 
00794             // << ((currentHashTableName==newHashTableName)?'T':'F') 
00795             // << endl;
00796 
00797 
00798             if (newHashTableName!=currentHashTableName)
00799             {
00800               cout << myPID 
00801                    << ": read new hash table name " << newHashTableName 
00802                    << " from control file." << endl;
00803             } // ~if
00804             else newHashTableName=""; // if same as old name don't reload
00805           } // ~else
00806 
00807           if (!newHashTableName.empty())
00808           {
00809             delete pHashTable;
00810             currentHashTableName=newHashTableName;
00811             pHashTable=generateHashTable
00812               (currentHashTableName, controlFileName);
00813           } // ~if
00814 
00815       } // ~if (!controlFileName
00816 
00817       if (serverMode==eFork)
00818       {
00819         if ( (childpid = Fork()) == 0) 
00820           { // child process
00821             Close(listenfd); // close listening socket 
00822             myPID = getpid();
00823             cout << myPID << ": child process started " << getTimeNow();
00824             resourceUsage();
00825             processQuery(connfd,*pHashTable,myPIDstring);       //
00826             resourceUsage();
00827             cout << myPID << ": child process ended " << getTimeNow();
00828             exit(0);
00829           }
00830         try
00831         {
00832           Close(connfd); // parent closes connected socket 
00833         } // ~try
00834         catch (const NetworkException& err )
00835         {
00836           cout << myPID 
00837                << ": Caught NetworkException while closing connection: " 
00838                << err.what() << endl;
00839         }
00840 
00841 
00842       } // ~if
00843       else if (serverMode==eThreads)
00844       {
00845         iptr= new int;
00846         *iptr=connfd;
00847 
00848         int numTries(0);
00849         
00850         while(1)
00851         {
00852           if (pthread_create(&tid, &threadAttribs, 
00853                              &threadWrapper, iptr )==0) break;
00854           sleep(30);
00855           cout << myPID 
00856                << ": failed to create thread, sleep 30 then try again"
00857                << endl;
00858           if (++numTries==10) 
00859             throw NetworkException ("Could not create thread");
00860         } // ~while
00861 
00862 
00863       } // ~else if
00864       else // single process mode
00865       {
00866         cout << myPID << ": query started " << getTimeNow();
00867         resourceUsage();
00868         try
00869         {
00870           processQuery(connfd,*pHashTable,myPIDstring); //
00871         }
00872         catch(const BrokenSocketException& err)
00873         {
00874           cout << myPID << ": caught BrokenSocketException, continuing"
00875                << endl;
00876         }
00877 
00878         resourceUsage();
00879         cout << myPID << ": query ended " << getTimeNow();
00880       } // ~else
00881 
00882     } // ~for ( ; ; )
00883 
00884   } // ~try
00885   catch (const NetworkException& err )
00886   {
00887     cout << myPID << ": Caught NetworkException: " << err.what() << endl;
00888     exit(1);
00889   } // ~catch
00890   catch (const SSAHAException& err )
00891   {
00892     cout << myPID <<": Caught SSAHA exception: " << err.what() << endl;
00893     exit(1);
00894   } // ~catch
00895   catch (const std::exception& err )
00896   {
00897     cout << myPID << ": Caught exception: " << err.what() << endl;
00898     exit(1);
00899   } // ~catch
00900 
00901 
00902 
00903 }
00904 
00905 
00906 // Name:
00907 // Arguments:
00908 // TYPE  NAME  IN/OUT COMMENT
00909 // Returns: TYPE COMMENT
00910 
00911 // End of file SSAHAServer.cpp
00912 

Generated on Fri Dec 21 13:12:15 2007 for ssaha by  doxygen 1.5.2