lib/pipeline.c

Go to the documentation of this file.
00001 /* pipeline.c - create a process pipeline that can be used for reading or
00002  * writing  */
00003 #include "pipeline.h"
00004 #include "common.h"
00005 #include "sqlNum.h"
00006 #include "dystring.h"
00007 #include "errabort.h"
00008 #include "portable.h"
00009 #include "linefile.h"
00010 #include <sys/types.h>
00011 #include <unistd.h>
00012 #include <sys/wait.h>
00013 #include <signal.h>
00014 
00015 /* FIXME: add close-on-exec startup error reporting */
00016 
00017 struct plProc
00018 /* A single process in a pipeline */
00019 {
00020     struct plProc *next;   /* order list of processes */
00021     struct pipeline *pl;   /* pipeline we are associated with */
00022     char **cmd;            /* null-terminated command for this process */
00023     pid_t  pid;            /* pid for process, -1 if not running */
00024     int status;            /* status from wait */
00025 };
00026 
00027 struct pipeline
00028 /* Object for a process pipeline and associated open file */
00029 {
00030     struct pipeline *next;
00031     struct plProc *procs;      /* list of processes */
00032     char *procName;            /* name to use in error messages. */
00033     int pipeFd;                /* fd of pipe to/from process, -1 if none */
00034     unsigned options;          /* options */
00035     FILE* pipeFh;              /* optional stdio around pipe */
00036     char* stdioBuf;            /* optional stdio buffer */
00037     struct lineFile *pipeLf;   /* optional lineFile around pipe */
00038 };
00039 
00040 /* file buffer size */
00041 #define FILE_BUF_SIZE 64*1024
00042 
00043 static int pipeCreate(int *writeFd)
00044 /* create a pipe of die, return readFd */
00045 {
00046 int pipeFds[2];
00047 if (pipe(pipeFds) < 0)
00048     errnoAbort("can't create pipe");
00049 *writeFd = pipeFds[1];
00050 return pipeFds[0];
00051 }
00052 
00053 static void safeClose(int *fdPtr)
00054 /* Close with error checking.  *fdPtr == -1 indicated already closed */
00055 {
00056 int fd = *fdPtr;
00057 if (fd != -1)
00058     {
00059     if (close(fd) < 0)
00060         errnoAbort("close failed on fd %d", fd);
00061     *fdPtr = -1;
00062     }
00063 }
00064 
00065 static char* joinCmd(char **cmd)
00066 /* join an cmd vector into a space separated string */
00067 {
00068 struct dyString *str = dyStringNew(512);
00069 int i;
00070 for (i = 0; cmd[i] != NULL; i++)
00071     {
00072     if (i > 0)
00073         dyStringAppend(str, " ");
00074     dyStringAppend(str, cmd[i]);
00075     }
00076 return dyStringCannibalize(&str);
00077 }
00078 
00079 static char* joinCmds(char ***cmds)
00080 /* join an cmds vetor into a space and pipe seperated string */
00081 {
00082 struct dyString *str = dyStringNew(512);
00083 int i, j;
00084 for (i = 0; cmds[i] != NULL; i++)
00085     {
00086     if (i > 0)
00087         dyStringAppend(str, " | ");
00088     for (j = 0; cmds[i][j] != NULL; j++)
00089         {
00090         if (j > 0)
00091             dyStringAppend(str, " ");
00092         dyStringAppend(str, cmds[i][j]);
00093         }
00094     }
00095 return dyStringCannibalize(&str);
00096 }
00097 
00098 static struct plProc* plProcNew(char **cmd, struct pipeline *pl)
00099 /* create a new plProc object for a command. */
00100 {
00101 int i, cmdLen = 0;
00102 struct plProc* proc;
00103 AllocVar(proc);
00104 proc->pl = pl;
00105 
00106 for (i = 0; cmd[i] != NULL; i++)
00107     cmdLen++;
00108 proc->cmd = needMem((cmdLen+1)*sizeof(char*));
00109 
00110 for (i = 0; i < cmdLen; i++)
00111     proc->cmd[i] = cloneString(cmd[i]);
00112 proc->cmd[cmdLen] = NULL;
00113 return proc;
00114 }
00115 
00116 static void plProcFree(struct plProc *proc)
00117 /* free a plProc object. */
00118 {
00119 int i;
00120 for (i = 0; proc->cmd[i] != NULL; i++)
00121     freeMem(proc->cmd[i]);
00122 freeMem(proc->cmd);
00123 freeMem(proc);
00124 }
00125 
00126 static void childAbortHandler()
00127 /* abort handler that just exts */
00128 {
00129 exit(100);
00130 }
00131 
00132 static void plProcSetup(struct plProc* proc, int stdinFd, int stdoutFd, int stderrFd)
00133 /* setup signal, error handling, and file descriptors after fork */
00134 {
00135 int fd;
00136 struct sigaction sigAct;
00137 
00138 /* make sure abort handler exits */
00139 pushWarnAbort();
00140 pushAbortHandler(childAbortHandler);
00141 
00142 /* treat a closed pipe as an EOF rather than getting SIGPIPE */
00143 ZeroVar(&sigAct);
00144 sigAct.sa_handler = SIG_IGN;
00145 if (sigaction(SIGPIPE, &sigAct, NULL) != 0)
00146     errnoAbort("failed to set SIGPIPE to SIG_IGN");
00147 
00148 /* child, first setup stdio files */
00149 if (stdinFd != STDIN_FILENO)
00150     {
00151     if (dup2(stdinFd, STDIN_FILENO) < 0)
00152         errnoAbort("can't dup2 to stdin");
00153     }
00154     
00155 if (stdoutFd != STDOUT_FILENO)
00156     {
00157     if (dup2(stdoutFd, STDOUT_FILENO) < 0)
00158         errnoAbort("can't dup2 to stdout");
00159     }
00160     
00161 if (stderrFd != STDERR_FILENO)
00162     {
00163     if (dup2(stderrFd, STDERR_FILENO) < 0)
00164         errnoAbort("can't dup2 to stderr");
00165     }
00166 
00167 /* close other file descriptors */
00168 for (fd = STDERR_FILENO+1; fd < 64; fd++)
00169     close(fd);
00170 }
00171 
00172 static void plProcExecChild(struct plProc* proc, int stdinFd, int stdoutFd, int stderrFd)
00173 /* child part of process startup. */
00174 {
00175 plProcSetup(proc, stdinFd, stdoutFd, stderrFd);
00176 /* FIXME: add close-on-exec startup error reporting here */
00177 execvp(proc->cmd[0], proc->cmd);
00178 errnoAbort("exec failed: %s", proc->cmd[0]);
00179 }
00180 
00181 static void plProcMemWrite(struct plProc* proc, int stdoutFd, int stderrFd, void *otherEndBuf, size_t otherEndBufSize)
00182 /* implements child process to write memory buffer to pipeline after
00183  * fork */
00184 {
00185 ssize_t wrCnt;
00186 plProcSetup(proc, STDIN_FILENO, stdoutFd, stderrFd);
00187 wrCnt = write(STDOUT_FILENO, otherEndBuf, otherEndBufSize);
00188 if (wrCnt < 0)
00189     errnoAbort("pipeline input buffer write failed");
00190 else if (wrCnt != otherEndBufSize)
00191     errAbort("pipeline input buffer short write %lld, expected %lld",
00192              (long long)wrCnt, (long long)otherEndBufSize);
00193 else
00194     {
00195     close(STDOUT_FILENO);
00196     exit(0);
00197     }
00198 }
00199 
00200 static void plProcWait(struct plProc* proc)
00201 /* wait for a process in a pipeline */
00202 {
00203 if (waitpid(proc->pid, &proc->status, 0) < 0)
00204     errnoAbort("process lost for: \"%s\" in pipeline \"%s\"", joinCmd(proc->cmd),
00205                proc->pl->procName);
00206 if (WIFSIGNALED(proc->status))
00207     errAbort("process terminated on signal %d: \"%s\" in pipeline \"%s\"",
00208              WTERMSIG(proc->status), joinCmd(proc->cmd), proc->pl->procName);
00209 assert(WIFEXITED(proc->status));
00210 
00211 if ((WEXITSTATUS(proc->status) != 0) && !(proc->pl->options & pipelineNoAbort))
00212     errAbort("process exited with %d: \"%s\" in pipeline \"%s\"",
00213              WEXITSTATUS(proc->status), joinCmd(proc->cmd), proc->pl->procName);
00214 proc->pid = -1;
00215 }
00216 
00217 static struct pipeline* pipelineNew(char ***cmds, unsigned options)
00218 /* create a new pipeline object. Doesn't start processes */
00219 {
00220 static char *memPseudoCmd[] = {"[mem]", NULL};
00221 struct pipeline *pl;
00222 int iCmd;
00223 
00224 AllocVar(pl);
00225 pl->pipeFd = -1;
00226 pl->options = options;
00227 pl->procName = joinCmds(cmds);
00228 
00229 if (cmds[0] == NULL)
00230     errAbort("no commands in pipeline");
00231 
00232 if (options & pipelineMemInput)
00233     {
00234     /* add proc for forked process to write memory to pipeline */
00235     slAddTail(&pl->procs, plProcNew(memPseudoCmd, pl));
00236     }
00237 
00238 for(iCmd = 0; cmds[iCmd] != NULL; iCmd++)
00239     slAddTail(&pl->procs, plProcNew(cmds[iCmd], pl));
00240 
00241 return pl;
00242 }
00243 
00244 void pipelineFree(struct pipeline **plPtr)
00245 /* free a pipeline object */
00246 {
00247 struct pipeline *pl = *plPtr;
00248 if (pl != NULL)
00249     {
00250     struct plProc *proc = pl->procs;
00251     while (proc != NULL)
00252         {
00253         struct plProc *delProc = proc;
00254         proc = proc->next;
00255         plProcFree(delProc);
00256         }
00257     freez(&pl->procName);
00258     freez(&pl->stdioBuf);
00259     freez(plPtr);
00260     }
00261 }
00262 
00263 static int pipelineExecProc(struct pipeline* pl, struct plProc *proc,
00264                             int prevStdoutFd, int stdinFd, int stdoutFd, int stderrFd,
00265                             void *otherEndBuf, size_t otherEndBufSize)
00266 /* start a process in the pipeline, return the stdout fd of the process */
00267 {
00268 /* determine stdin/stdout to use */
00269 int procStdinFd, procStdoutFd;
00270 if (proc == pl->procs)
00271     procStdinFd = stdinFd; /* first process in pipeline */
00272 else
00273     procStdinFd = prevStdoutFd;
00274 if (proc->next == NULL)
00275     procStdoutFd = stdoutFd; /* last process in pipeline */
00276 else
00277     prevStdoutFd = pipeCreate(&procStdoutFd);
00278 
00279 /* start process */
00280 if ((proc->pid = fork()) < 0)
00281     errnoAbort("can't fork");
00282 if (proc->pid == 0)
00283     {
00284     if (otherEndBuf != NULL)
00285         plProcMemWrite(proc, procStdoutFd, stderrFd, otherEndBuf, otherEndBufSize);
00286     else
00287         plProcExecChild(proc, procStdinFd, procStdoutFd, stderrFd);
00288     }
00289 
00290 /* don't leave intermediate pipes open in parent */
00291 if (proc != pl->procs)
00292     safeClose(&procStdinFd);
00293 if (proc->next != NULL)
00294     safeClose(&procStdoutFd);
00295 return prevStdoutFd;
00296 }
00297 
00298 static void pipelineExec(struct pipeline* pl, int stdinFd, int stdoutFd, int stderrFd,
00299                          void *otherEndBuf, size_t otherEndBufSize)
00300 /* Start all processes in a pipeline, stdinFd and stdoutFd are the ends of
00301  * the pipeline, stderrFd is applied to all processed */
00302 {
00303 struct plProc *proc;
00304 int prevStdoutFd = -1;
00305 for (proc = pl->procs; proc != NULL; proc = proc->next)
00306     {
00307     prevStdoutFd = pipelineExecProc(pl, proc, prevStdoutFd,
00308                                     stdinFd, stdoutFd, stderrFd,
00309                                     otherEndBuf, otherEndBufSize);
00310     otherEndBuf = NULL;  /* only for first process (read pipes) */
00311     otherEndBufSize = 0;
00312     }
00313 }
00314 
00315 static int openRead(char *fname)
00316 /* open a file for reading */
00317 {
00318 int fd = open(fname, O_RDONLY);
00319 if (fd < 0)
00320     errnoAbort("can't open for read access: %s", fname);
00321 return fd;
00322 }
00323 
00324 static int openWrite(char *fname)
00325 /* open a file for write access */
00326 {
00327 int fd = open(fname, O_WRONLY|O_CREAT|O_TRUNC, 0666);
00328 if (fd < 0)
00329     errnoAbort("can't open for write access: %s", fname);
00330 return fd;
00331 }
00332 
00333 static void pipelineStartRead(struct pipeline *pl, int stdinFd, int stderrFd,
00334                               void *otherEndBuf, size_t otherEndBufSize)
00335 /* start a read pipeline */
00336 {
00337 int pipeWrFd;
00338 pl->pipeFd = pipeCreate(&pipeWrFd);
00339 pipelineExec(pl, stdinFd, pipeWrFd, stderrFd, otherEndBuf, otherEndBufSize);
00340 safeClose(&pipeWrFd);
00341 }
00342 
00343 static void pipelineStartWrite(struct pipeline *pl, int stdoutFd, int stderrFd)
00344 /* start a write pipeline */
00345 {
00346 int pipeRdFd = pipeCreate(&pl->pipeFd);
00347 pipelineExec(pl, pipeRdFd, stdoutFd, stderrFd, NULL, 0);
00348 safeClose(&pipeRdFd);
00349 }
00350 
00351 static void checkOpts(unsigned opts)
00352 /* check option set for consistency */
00353 {
00354 if (((opts & (pipelineRead|pipelineWrite)) == 0)
00355     || ((opts & (pipelineRead|pipelineWrite)) == (pipelineRead|pipelineWrite)))
00356     errAbort("must specify one of pipelineRead or pipelineWrite to pipelineOpen");
00357 }
00358 
00359 struct pipeline *pipelineOpenFd(char ***cmds, unsigned opts,
00360                                 int otherEndFd, int stderrFd)
00361 /* Create a pipeline from an array of commands.  See pipeline.h for
00362  * full documentation. */
00363 {
00364 struct pipeline *pl;
00365 
00366 checkOpts(opts);
00367 pl = pipelineNew(cmds, opts);
00368 if (opts & pipelineRead)
00369     pipelineStartRead(pl, otherEndFd, stderrFd, NULL, 0);
00370 else
00371     pipelineStartWrite(pl, otherEndFd, stderrFd);
00372 return pl;
00373 }
00374 
00375 struct pipeline *pipelineOpen(char ***cmds, unsigned opts,
00376                               char *otherEndFile, char *stderrFile)
00377 /* Create a pipeline from an array of commands.  See pipeline.h for
00378  * full documentation */
00379 {
00380 int otherEndFd;
00381 int stderrFd = (stderrFile == NULL) ? STDERR_FILENO : openWrite(stderrFile);
00382 
00383 checkOpts(opts);
00384 if (opts & pipelineRead)
00385     otherEndFd = (otherEndFile == NULL) ? STDIN_FILENO : openRead(otherEndFile);
00386 else
00387     otherEndFd = (otherEndFile == NULL) ? STDOUT_FILENO : openWrite(otherEndFile);
00388 struct pipeline *pl = pipelineOpenFd(cmds, opts, otherEndFd, stderrFd);
00389 safeClose(&otherEndFd);
00390 if (stderrFile != NULL)
00391     safeClose(&stderrFd);
00392 return pl;
00393 }
00394 
00395 struct pipeline *pipelineOpenMem(char ***cmds, unsigned opts,
00396                                  void *otherEndBuf, size_t otherEndBufSize,
00397                                  int stderrFd)
00398 /* Create a pipeline from an array of commands, with the pipeline input/output
00399  * in a memory buffer.  See pipeline.h for full documentation.  Currently only
00400  * input to a read pipeline is supported  */
00401 {
00402 struct pipeline *pl;
00403 checkOpts(opts);
00404 if (opts & pipelineWrite)
00405     errAbort("pipelineOpenMem only supports read pipelines at this time");
00406 opts |= pipelineMemInput;
00407 
00408 pl = pipelineNew(cmds, opts);
00409 pipelineStartRead(pl, STDIN_FILENO, stderrFd, otherEndBuf, otherEndBufSize);
00410 return pl;
00411 }
00412 
00413 struct pipeline *pipelineOpenFd1(char **cmd, unsigned opts,
00414                                  int otherEndFd, int stderrFd)
00415 /* like pipelineOpenFd(), only takes a single command */
00416 {
00417 char **cmds[2];
00418 cmds[0] = cmd;
00419 cmds[1] = NULL;
00420 return pipelineOpenFd(cmds, opts, otherEndFd, stderrFd);
00421 }
00422 
00423 struct pipeline *pipelineOpen1(char **cmd, unsigned opts,
00424                                char *otherEndFile, char *stderrFile)
00425 /* like pipelineOpen(), only takes a single command */
00426 {
00427 char **cmds[2];
00428 cmds[0] = cmd;
00429 cmds[1] = NULL;
00430 return pipelineOpen(cmds, opts, otherEndFile, stderrFile);
00431 }
00432 
00433 struct pipeline *pipelineOpenMem1(char **cmd, unsigned opts,
00434                                   void *otherEndBuf, size_t otherEndBufSize,
00435                                   int stderrFd)
00436 /* like pipelineOpenMem(), only takes a single command */
00437 {
00438 char **cmds[2];
00439 cmds[0] = cmd;
00440 cmds[1] = NULL;
00441 return pipelineOpenMem(cmds, opts, otherEndBuf, otherEndBufSize, stderrFd);
00442 }
00443 
00444 char *pipelineDesc(struct pipeline *pl)
00445 /* Get the description of a pipeline for use in error messages */
00446 {
00447 return pl->procName;
00448 }
00449 
00450 int pipelineFd(struct pipeline *pl)
00451 /* Get the file descriptor for a pipeline */
00452 {
00453 return pl->pipeFd;
00454 }
00455 
00456 FILE *pipelineFile(struct pipeline *pl)
00457 /* Get a FILE object wrapped around the pipeline.  Do not close the FILE, is
00458  * owned by the pipeline object.  A FILE is created on first call to this
00459  * function.  Subsequent calls return the same FILE.*/
00460 {
00461 if (pl->pipeFh == NULL)
00462     {
00463     /* create FILE* on first access */
00464     char *mode = (pl->options & pipelineRead) ? "r" : "w";
00465     if (pl->pipeLf != NULL)
00466         errAbort("can't call pipelineFile after having associated a lineFile with a pipeline");
00467     pl->pipeFh = fdopen(pl->pipeFd, mode);
00468     if (pl->pipeFh == NULL)
00469         errnoAbort("fdopen failed for: %s", pl->procName);
00470     pl->stdioBuf = needLargeMem(FILE_BUF_SIZE);
00471     setvbuf(pl->pipeFh, pl->stdioBuf,  _IOFBF, FILE_BUF_SIZE);
00472     }
00473 return pl->pipeFh;
00474 }
00475 
00476 struct lineFile *pipelineLineFile(struct pipeline *pl)
00477 /* Get a lineFile object wrapped around the pipeline.  Do not close the
00478  * lineFile, is owned by the pipeline object.  A lineFile is created on first
00479  * call to this function.  Subsequent calls return the same object.*/
00480 {
00481 if (pl->pipeLf == NULL)
00482     {
00483     /* create line on first acess */
00484     if (pl->pipeFh != NULL)
00485         errAbort("can't call pipelineLineFile after having associated a FILE with a pipeline");
00486     if (pl->options & pipelineWrite)
00487         errAbort("can't associated a lineFile with a write pipeline");
00488     pl->pipeLf = lineFileAttach(pipelineDesc(pl), TRUE, pl->pipeFd);
00489     }
00490 return pl->pipeLf;
00491 }
00492 
00493 static void closePipelineFile(struct pipeline *pl)
00494 /* close a pipeline with a FILE associated with it */
00495 {
00496 if (pl->options & pipelineWrite)
00497     {
00498     fflush(pl->pipeFh);
00499     if (ferror(pl->pipeFh))
00500         errAbort("write failed to pipeline: %s ", pl->procName);
00501     }
00502 else if (ferror(pl->pipeFh))
00503     errAbort("read failed from pipeline: %s ", pl->procName);
00504 
00505 if (fclose(pl->pipeFh) == EOF)
00506     errAbort("close failed on pipeline: %s ", pl->procName);
00507 pl->pipeFh = NULL;
00508 }
00509 
00510 static void closePipeline(struct pipeline *pl)
00511 /* Close the pipe file */
00512 {
00513 if (pl->pipeFh != NULL)
00514     closePipelineFile(pl);
00515 else if (pl->pipeLf != NULL)
00516     lineFileClose(&pl->pipeLf);
00517 else
00518     {
00519     if (close(pl->pipeFd) < 0)
00520         errAbort("close failed on pipeline: %s ", pl->procName);
00521     }
00522 pl->pipeFd = -1;
00523 }
00524 
00525 int pipelineWait(struct pipeline *pl)
00526 /* Wait for processes in a pipeline to complete; normally aborts if any
00527  * process exists non-zero.  If pipelineNoAbort was specified, return the exit
00528  * code of the first process exit non-zero, or zero if none failed. */
00529 {
00530 struct plProc *proc;
00531 int exitCode = 0;
00532 
00533 /* must close before waits for output pipeline */
00534 if (pl->options & pipelineWrite)
00535     closePipeline(pl);
00536 
00537 /* wait on each process in order */
00538 for (proc = pl->procs; proc != NULL; proc = proc->next)
00539     {
00540     plProcWait(proc);
00541     if ((WEXITSTATUS(proc->status) != 0) && (exitCode == 0))
00542         exitCode = WEXITSTATUS(proc->status);
00543     }
00544 
00545 /* must close after waits for input pipeline */
00546 if (pl->options & pipelineRead)
00547     closePipeline(pl);
00548 
00549 return exitCode;
00550 }
00551 
00552 void pipelineDumpCmds(char ***cmds)
00553 /* Dump out pipeline-formatted commands to stdout for debugging. */
00554 {
00555 char **cmd;
00556 boolean first = TRUE;
00557 while ((cmd = *cmds++) != NULL)
00558    {
00559    char *word;
00560    if (first)
00561       first = FALSE;
00562    else
00563       printf("| ");
00564    while ((word = *cmd++) != NULL)
00565        printf("%s ", word);
00566    }
00567 printf("<BR>\n");
00568 }
00569 
00570 /*
00571  * Local Variables:
00572  * c-file-style: "jkent-c"
00573  * End:
00574  */
00575 

Generated on Tue Dec 25 18:39:31 2007 for blat by  doxygen 1.5.2