00001
00002
00003 #include "pipeline.h"
00004 #include "common.h"
00005 #include "sqlNum.h"
00006 #include "dystring.h"
00007 #include "errabort.h"
00008 #include "portable.h"
00009 #include "linefile.h"
00010 #include <sys/types.h>
00011 #include <unistd.h>
00012 #include <sys/wait.h>
00013 #include <signal.h>
00014
00015
00016
00017 struct plProc
00018
00019 {
00020 struct plProc *next;
00021 struct pipeline *pl;
00022 char **cmd;
00023 pid_t pid;
00024 int status;
00025 };
00026
00027 struct pipeline
00028
00029 {
00030 struct pipeline *next;
00031 struct plProc *procs;
00032 char *procName;
00033 int pipeFd;
00034 unsigned options;
00035 FILE* pipeFh;
00036 char* stdioBuf;
00037 struct lineFile *pipeLf;
00038 };
00039
00040
00041 #define FILE_BUF_SIZE 64*1024
00042
00043 static int pipeCreate(int *writeFd)
00044
00045 {
00046 int pipeFds[2];
00047 if (pipe(pipeFds) < 0)
00048 errnoAbort("can't create pipe");
00049 *writeFd = pipeFds[1];
00050 return pipeFds[0];
00051 }
00052
00053 static void safeClose(int *fdPtr)
00054
00055 {
00056 int fd = *fdPtr;
00057 if (fd != -1)
00058 {
00059 if (close(fd) < 0)
00060 errnoAbort("close failed on fd %d", fd);
00061 *fdPtr = -1;
00062 }
00063 }
00064
00065 static char* joinCmd(char **cmd)
00066
00067 {
00068 struct dyString *str = dyStringNew(512);
00069 int i;
00070 for (i = 0; cmd[i] != NULL; i++)
00071 {
00072 if (i > 0)
00073 dyStringAppend(str, " ");
00074 dyStringAppend(str, cmd[i]);
00075 }
00076 return dyStringCannibalize(&str);
00077 }
00078
00079 static char* joinCmds(char ***cmds)
00080
00081 {
00082 struct dyString *str = dyStringNew(512);
00083 int i, j;
00084 for (i = 0; cmds[i] != NULL; i++)
00085 {
00086 if (i > 0)
00087 dyStringAppend(str, " | ");
00088 for (j = 0; cmds[i][j] != NULL; j++)
00089 {
00090 if (j > 0)
00091 dyStringAppend(str, " ");
00092 dyStringAppend(str, cmds[i][j]);
00093 }
00094 }
00095 return dyStringCannibalize(&str);
00096 }
00097
00098 static struct plProc* plProcNew(char **cmd, struct pipeline *pl)
00099
00100 {
00101 int i, cmdLen = 0;
00102 struct plProc* proc;
00103 AllocVar(proc);
00104 proc->pl = pl;
00105
00106 for (i = 0; cmd[i] != NULL; i++)
00107 cmdLen++;
00108 proc->cmd = needMem((cmdLen+1)*sizeof(char*));
00109
00110 for (i = 0; i < cmdLen; i++)
00111 proc->cmd[i] = cloneString(cmd[i]);
00112 proc->cmd[cmdLen] = NULL;
00113 return proc;
00114 }
00115
00116 static void plProcFree(struct plProc *proc)
00117
00118 {
00119 int i;
00120 for (i = 0; proc->cmd[i] != NULL; i++)
00121 freeMem(proc->cmd[i]);
00122 freeMem(proc->cmd);
00123 freeMem(proc);
00124 }
00125
00126 static void childAbortHandler()
00127
00128 {
00129 exit(100);
00130 }
00131
00132 static void plProcSetup(struct plProc* proc, int stdinFd, int stdoutFd, int stderrFd)
00133
00134 {
00135 int fd;
00136 struct sigaction sigAct;
00137
00138
00139 pushWarnAbort();
00140 pushAbortHandler(childAbortHandler);
00141
00142
00143 ZeroVar(&sigAct);
00144 sigAct.sa_handler = SIG_IGN;
00145 if (sigaction(SIGPIPE, &sigAct, NULL) != 0)
00146 errnoAbort("failed to set SIGPIPE to SIG_IGN");
00147
00148
00149 if (stdinFd != STDIN_FILENO)
00150 {
00151 if (dup2(stdinFd, STDIN_FILENO) < 0)
00152 errnoAbort("can't dup2 to stdin");
00153 }
00154
00155 if (stdoutFd != STDOUT_FILENO)
00156 {
00157 if (dup2(stdoutFd, STDOUT_FILENO) < 0)
00158 errnoAbort("can't dup2 to stdout");
00159 }
00160
00161 if (stderrFd != STDERR_FILENO)
00162 {
00163 if (dup2(stderrFd, STDERR_FILENO) < 0)
00164 errnoAbort("can't dup2 to stderr");
00165 }
00166
00167
00168 for (fd = STDERR_FILENO+1; fd < 64; fd++)
00169 close(fd);
00170 }
00171
00172 static void plProcExecChild(struct plProc* proc, int stdinFd, int stdoutFd, int stderrFd)
00173
00174 {
00175 plProcSetup(proc, stdinFd, stdoutFd, stderrFd);
00176
00177 execvp(proc->cmd[0], proc->cmd);
00178 errnoAbort("exec failed: %s", proc->cmd[0]);
00179 }
00180
00181 static void plProcMemWrite(struct plProc* proc, int stdoutFd, int stderrFd, void *otherEndBuf, size_t otherEndBufSize)
00182
00183
00184 {
00185 ssize_t wrCnt;
00186 plProcSetup(proc, STDIN_FILENO, stdoutFd, stderrFd);
00187 wrCnt = write(STDOUT_FILENO, otherEndBuf, otherEndBufSize);
00188 if (wrCnt < 0)
00189 errnoAbort("pipeline input buffer write failed");
00190 else if (wrCnt != otherEndBufSize)
00191 errAbort("pipeline input buffer short write %lld, expected %lld",
00192 (long long)wrCnt, (long long)otherEndBufSize);
00193 else
00194 {
00195 close(STDOUT_FILENO);
00196 exit(0);
00197 }
00198 }
00199
00200 static void plProcWait(struct plProc* proc)
00201
00202 {
00203 if (waitpid(proc->pid, &proc->status, 0) < 0)
00204 errnoAbort("process lost for: \"%s\" in pipeline \"%s\"", joinCmd(proc->cmd),
00205 proc->pl->procName);
00206 if (WIFSIGNALED(proc->status))
00207 errAbort("process terminated on signal %d: \"%s\" in pipeline \"%s\"",
00208 WTERMSIG(proc->status), joinCmd(proc->cmd), proc->pl->procName);
00209 assert(WIFEXITED(proc->status));
00210
00211 if ((WEXITSTATUS(proc->status) != 0) && !(proc->pl->options & pipelineNoAbort))
00212 errAbort("process exited with %d: \"%s\" in pipeline \"%s\"",
00213 WEXITSTATUS(proc->status), joinCmd(proc->cmd), proc->pl->procName);
00214 proc->pid = -1;
00215 }
00216
00217 static struct pipeline* pipelineNew(char ***cmds, unsigned options)
00218
00219 {
00220 static char *memPseudoCmd[] = {"[mem]", NULL};
00221 struct pipeline *pl;
00222 int iCmd;
00223
00224 AllocVar(pl);
00225 pl->pipeFd = -1;
00226 pl->options = options;
00227 pl->procName = joinCmds(cmds);
00228
00229 if (cmds[0] == NULL)
00230 errAbort("no commands in pipeline");
00231
00232 if (options & pipelineMemInput)
00233 {
00234
00235 slAddTail(&pl->procs, plProcNew(memPseudoCmd, pl));
00236 }
00237
00238 for(iCmd = 0; cmds[iCmd] != NULL; iCmd++)
00239 slAddTail(&pl->procs, plProcNew(cmds[iCmd], pl));
00240
00241 return pl;
00242 }
00243
00244 void pipelineFree(struct pipeline **plPtr)
00245
00246 {
00247 struct pipeline *pl = *plPtr;
00248 if (pl != NULL)
00249 {
00250 struct plProc *proc = pl->procs;
00251 while (proc != NULL)
00252 {
00253 struct plProc *delProc = proc;
00254 proc = proc->next;
00255 plProcFree(delProc);
00256 }
00257 freez(&pl->procName);
00258 freez(&pl->stdioBuf);
00259 freez(plPtr);
00260 }
00261 }
00262
00263 static int pipelineExecProc(struct pipeline* pl, struct plProc *proc,
00264 int prevStdoutFd, int stdinFd, int stdoutFd, int stderrFd,
00265 void *otherEndBuf, size_t otherEndBufSize)
00266
00267 {
00268
00269 int procStdinFd, procStdoutFd;
00270 if (proc == pl->procs)
00271 procStdinFd = stdinFd;
00272 else
00273 procStdinFd = prevStdoutFd;
00274 if (proc->next == NULL)
00275 procStdoutFd = stdoutFd;
00276 else
00277 prevStdoutFd = pipeCreate(&procStdoutFd);
00278
00279
00280 if ((proc->pid = fork()) < 0)
00281 errnoAbort("can't fork");
00282 if (proc->pid == 0)
00283 {
00284 if (otherEndBuf != NULL)
00285 plProcMemWrite(proc, procStdoutFd, stderrFd, otherEndBuf, otherEndBufSize);
00286 else
00287 plProcExecChild(proc, procStdinFd, procStdoutFd, stderrFd);
00288 }
00289
00290
00291 if (proc != pl->procs)
00292 safeClose(&procStdinFd);
00293 if (proc->next != NULL)
00294 safeClose(&procStdoutFd);
00295 return prevStdoutFd;
00296 }
00297
00298 static void pipelineExec(struct pipeline* pl, int stdinFd, int stdoutFd, int stderrFd,
00299 void *otherEndBuf, size_t otherEndBufSize)
00300
00301
00302 {
00303 struct plProc *proc;
00304 int prevStdoutFd = -1;
00305 for (proc = pl->procs; proc != NULL; proc = proc->next)
00306 {
00307 prevStdoutFd = pipelineExecProc(pl, proc, prevStdoutFd,
00308 stdinFd, stdoutFd, stderrFd,
00309 otherEndBuf, otherEndBufSize);
00310 otherEndBuf = NULL;
00311 otherEndBufSize = 0;
00312 }
00313 }
00314
00315 static int openRead(char *fname)
00316
00317 {
00318 int fd = open(fname, O_RDONLY);
00319 if (fd < 0)
00320 errnoAbort("can't open for read access: %s", fname);
00321 return fd;
00322 }
00323
00324 static int openWrite(char *fname)
00325
00326 {
00327 int fd = open(fname, O_WRONLY|O_CREAT|O_TRUNC, 0666);
00328 if (fd < 0)
00329 errnoAbort("can't open for write access: %s", fname);
00330 return fd;
00331 }
00332
00333 static void pipelineStartRead(struct pipeline *pl, int stdinFd, int stderrFd,
00334 void *otherEndBuf, size_t otherEndBufSize)
00335
00336 {
00337 int pipeWrFd;
00338 pl->pipeFd = pipeCreate(&pipeWrFd);
00339 pipelineExec(pl, stdinFd, pipeWrFd, stderrFd, otherEndBuf, otherEndBufSize);
00340 safeClose(&pipeWrFd);
00341 }
00342
00343 static void pipelineStartWrite(struct pipeline *pl, int stdoutFd, int stderrFd)
00344
00345 {
00346 int pipeRdFd = pipeCreate(&pl->pipeFd);
00347 pipelineExec(pl, pipeRdFd, stdoutFd, stderrFd, NULL, 0);
00348 safeClose(&pipeRdFd);
00349 }
00350
00351 static void checkOpts(unsigned opts)
00352
00353 {
00354 if (((opts & (pipelineRead|pipelineWrite)) == 0)
00355 || ((opts & (pipelineRead|pipelineWrite)) == (pipelineRead|pipelineWrite)))
00356 errAbort("must specify one of pipelineRead or pipelineWrite to pipelineOpen");
00357 }
00358
00359 struct pipeline *pipelineOpenFd(char ***cmds, unsigned opts,
00360 int otherEndFd, int stderrFd)
00361
00362
00363 {
00364 struct pipeline *pl;
00365
00366 checkOpts(opts);
00367 pl = pipelineNew(cmds, opts);
00368 if (opts & pipelineRead)
00369 pipelineStartRead(pl, otherEndFd, stderrFd, NULL, 0);
00370 else
00371 pipelineStartWrite(pl, otherEndFd, stderrFd);
00372 return pl;
00373 }
00374
00375 struct pipeline *pipelineOpen(char ***cmds, unsigned opts,
00376 char *otherEndFile, char *stderrFile)
00377
00378
00379 {
00380 int otherEndFd;
00381 int stderrFd = (stderrFile == NULL) ? STDERR_FILENO : openWrite(stderrFile);
00382
00383 checkOpts(opts);
00384 if (opts & pipelineRead)
00385 otherEndFd = (otherEndFile == NULL) ? STDIN_FILENO : openRead(otherEndFile);
00386 else
00387 otherEndFd = (otherEndFile == NULL) ? STDOUT_FILENO : openWrite(otherEndFile);
00388 struct pipeline *pl = pipelineOpenFd(cmds, opts, otherEndFd, stderrFd);
00389 safeClose(&otherEndFd);
00390 if (stderrFile != NULL)
00391 safeClose(&stderrFd);
00392 return pl;
00393 }
00394
00395 struct pipeline *pipelineOpenMem(char ***cmds, unsigned opts,
00396 void *otherEndBuf, size_t otherEndBufSize,
00397 int stderrFd)
00398
00399
00400
00401 {
00402 struct pipeline *pl;
00403 checkOpts(opts);
00404 if (opts & pipelineWrite)
00405 errAbort("pipelineOpenMem only supports read pipelines at this time");
00406 opts |= pipelineMemInput;
00407
00408 pl = pipelineNew(cmds, opts);
00409 pipelineStartRead(pl, STDIN_FILENO, stderrFd, otherEndBuf, otherEndBufSize);
00410 return pl;
00411 }
00412
00413 struct pipeline *pipelineOpenFd1(char **cmd, unsigned opts,
00414 int otherEndFd, int stderrFd)
00415
00416 {
00417 char **cmds[2];
00418 cmds[0] = cmd;
00419 cmds[1] = NULL;
00420 return pipelineOpenFd(cmds, opts, otherEndFd, stderrFd);
00421 }
00422
00423 struct pipeline *pipelineOpen1(char **cmd, unsigned opts,
00424 char *otherEndFile, char *stderrFile)
00425
00426 {
00427 char **cmds[2];
00428 cmds[0] = cmd;
00429 cmds[1] = NULL;
00430 return pipelineOpen(cmds, opts, otherEndFile, stderrFile);
00431 }
00432
00433 struct pipeline *pipelineOpenMem1(char **cmd, unsigned opts,
00434 void *otherEndBuf, size_t otherEndBufSize,
00435 int stderrFd)
00436
00437 {
00438 char **cmds[2];
00439 cmds[0] = cmd;
00440 cmds[1] = NULL;
00441 return pipelineOpenMem(cmds, opts, otherEndBuf, otherEndBufSize, stderrFd);
00442 }
00443
00444 char *pipelineDesc(struct pipeline *pl)
00445
00446 {
00447 return pl->procName;
00448 }
00449
00450 int pipelineFd(struct pipeline *pl)
00451
00452 {
00453 return pl->pipeFd;
00454 }
00455
00456 FILE *pipelineFile(struct pipeline *pl)
00457
00458
00459
00460 {
00461 if (pl->pipeFh == NULL)
00462 {
00463
00464 char *mode = (pl->options & pipelineRead) ? "r" : "w";
00465 if (pl->pipeLf != NULL)
00466 errAbort("can't call pipelineFile after having associated a lineFile with a pipeline");
00467 pl->pipeFh = fdopen(pl->pipeFd, mode);
00468 if (pl->pipeFh == NULL)
00469 errnoAbort("fdopen failed for: %s", pl->procName);
00470 pl->stdioBuf = needLargeMem(FILE_BUF_SIZE);
00471 setvbuf(pl->pipeFh, pl->stdioBuf, _IOFBF, FILE_BUF_SIZE);
00472 }
00473 return pl->pipeFh;
00474 }
00475
00476 struct lineFile *pipelineLineFile(struct pipeline *pl)
00477
00478
00479
00480 {
00481 if (pl->pipeLf == NULL)
00482 {
00483
00484 if (pl->pipeFh != NULL)
00485 errAbort("can't call pipelineLineFile after having associated a FILE with a pipeline");
00486 if (pl->options & pipelineWrite)
00487 errAbort("can't associated a lineFile with a write pipeline");
00488 pl->pipeLf = lineFileAttach(pipelineDesc(pl), TRUE, pl->pipeFd);
00489 }
00490 return pl->pipeLf;
00491 }
00492
00493 static void closePipelineFile(struct pipeline *pl)
00494
00495 {
00496 if (pl->options & pipelineWrite)
00497 {
00498 fflush(pl->pipeFh);
00499 if (ferror(pl->pipeFh))
00500 errAbort("write failed to pipeline: %s ", pl->procName);
00501 }
00502 else if (ferror(pl->pipeFh))
00503 errAbort("read failed from pipeline: %s ", pl->procName);
00504
00505 if (fclose(pl->pipeFh) == EOF)
00506 errAbort("close failed on pipeline: %s ", pl->procName);
00507 pl->pipeFh = NULL;
00508 }
00509
00510 static void closePipeline(struct pipeline *pl)
00511
00512 {
00513 if (pl->pipeFh != NULL)
00514 closePipelineFile(pl);
00515 else if (pl->pipeLf != NULL)
00516 lineFileClose(&pl->pipeLf);
00517 else
00518 {
00519 if (close(pl->pipeFd) < 0)
00520 errAbort("close failed on pipeline: %s ", pl->procName);
00521 }
00522 pl->pipeFd = -1;
00523 }
00524
00525 int pipelineWait(struct pipeline *pl)
00526
00527
00528
00529 {
00530 struct plProc *proc;
00531 int exitCode = 0;
00532
00533
00534 if (pl->options & pipelineWrite)
00535 closePipeline(pl);
00536
00537
00538 for (proc = pl->procs; proc != NULL; proc = proc->next)
00539 {
00540 plProcWait(proc);
00541 if ((WEXITSTATUS(proc->status) != 0) && (exitCode == 0))
00542 exitCode = WEXITSTATUS(proc->status);
00543 }
00544
00545
00546 if (pl->options & pipelineRead)
00547 closePipeline(pl);
00548
00549 return exitCode;
00550 }
00551
00552 void pipelineDumpCmds(char ***cmds)
00553
00554 {
00555 char **cmd;
00556 boolean first = TRUE;
00557 while ((cmd = *cmds++) != NULL)
00558 {
00559 char *word;
00560 if (first)
00561 first = FALSE;
00562 else
00563 printf("| ");
00564 while ((word = *cmd++) != NULL)
00565 printf("%s ", word);
00566 }
00567 printf("<BR>\n");
00568 }
00569
00570
00571
00572
00573
00574
00575