#! /bin/sh # This is a shell archive, meaning: # 1. Remove everything above the #! /bin/sh line. # 2. Save the resulting text in a file. # 3. Execute the file with /bin/sh (not csh) to create the files: # README # README2 # Makefile # team.c # team.1.dist # team.c.dist # This archive created: Sat Sep 25 08:53:15 1999 # By: Daniel E. Singer (des@cs.duke.edu) # Duke University CS Dept. export PATH; PATH=/bin:$PATH echo shar: extracting "'README'" '(1960 characters)' if test -f 'README' then echo shar: will not over-write existing file "'README'" else sed 's/^ X//' << \SHAR_EOF > 'README' XFrom: pcg@aber.ac.uk (Piercarlo Grandi) XNewsgroups: comp.sources.unix XSubject: v27i195: team - portable multi-buffered tape streaming utility, Part01/01 XDate: 13 Jan 1994 13:24:40 -0800 XApproved: vixie@gw.home.vix.com XMessage-ID: <1.758496249.28141@gw.home.vix.com> X XSubmitted-By: pcg@aber.ac.uk (Piercarlo Grandi) XPosting-Number: Volume 27, Issue 195 XArchive-Name: team/part01 X XThere exist a few filters that help tapes streams by buffering IO and Xallowing reads to overlaps with writes under Unix. Most of these filters Xrely on relatively unportable features, for example SYSV like shared Xmemory. X Xteam is a filter that runs essentially unchanged on any Unix version, as Xit relies only on features present in V7. A number of team processes X(team members) share a common input fd and a common output fd, and they Xtake turns at reading from the former and writing to the latter; they Xsynchronize by using a ring of pipes between them, where a "read-enable" Xand a "write-enable" token circulate. X Xteam is not just very portable, but also portable and efficient. It also Xhas some bells & whistles, like command line options to specify the Xnumber of processes in a team, the block size for IO, and the volume Xsize of the input or output media. It also optionally reports its Xprogress. X XPrevious versions of team have been circulating (e.g. via alt.sources) Xfor several years; I have not found a bug for a long time, even if Xsurely they will exist. X XThe team source is GPL'ed, and it comes with no warranty. X X Note: this program was developed entirely by the author in his own X time, using his own resources, on his machine, in the context of X his own research activities. In no way has the University of Wales, X Aberystwyth contributed aided or abetted to this work, for which X they bear no responsibility whatsoever. I am grateful to UWA for the X ability to use their systems (as a paying customer) to post this work. X X pcg@aber.ac.uk (Piercarlo Grandi) X SHAR_EOF if test 1960 -ne "`wc -c < 'README'`" then echo shar: error transmitting "'README'" '(should have been 1960 characters)' fi fi # end of overwriting check echo shar: extracting "'README2'" '(552 characters)' if test -f 'README2' then echo shar: will not over-write existing file "'README2'" else sed 's/^ X//' << \SHAR_EOF > 'README2' X/* X * Note: This version of the `team' program by Piercarlo Grandi X * has been severely hacked on by Daniel E. Singer , X * and includes many changes. Its basic functionality remains the X * same. The current version is tailored for Solaris 2.6, and will X * probably not compile on other systems. Since so many changes were X * needed, most of the "platform independent" provisions have been X * stripped out. X * X * Note: The man page is the original, and is not updated. Try X * `team -h' for current usage and help information. X */ SHAR_EOF if test 552 -ne "`wc -c < 'README2'`" then echo shar: error transmitting "'README2'" '(should have been 552 characters)' fi fi # end of overwriting check echo shar: extracting "'Makefile'" '(1052 characters)' if test -f 'Makefile' then echo shar: will not over-write existing file "'Makefile'" else sed 's/^ X//' << \SHAR_EOF > 'Makefile' X# Makefile for the `team' program, tuned for Solaris 2.6 X# 4/98, D.Singer X XPROG = team X XSHARE_FILE_NAME = ${PROG}.share XSHARE_FILE_NAME_ZIPPED = ${SHARE_FILE_NAME}.gz XSHARE_FILES = README README2 Makefile ${PROG}.c ${PROG}.1.dist ${PROG}.c.dist X XMAN1 = /usr/man/man1 X XCC = /opt/SUNWspro/bin/cc XCFLAGS = -O -D_FILE_OFFSET_BITS=64 XLDFLAGS = -s X XCP = /usr/bin/cp XRM = /usr/bin/rm XSHAR = /usr/local/bin/shar XGZIP = /usr/local/bin/gzip X X Xall: ${PROG} X Xclean:: X rm -f ${PROG} ${PROG}.o X X${PROG}: ${PROG}.c X ${CC} ${CFLAGS} `getconf LFS_CFLAGS` -o ${PROG} ${PROG}.c \ X `getconf LFS_LDFLAGS` `getconf LFS_LIBS` X Xlint: X /auto/pkg/SUNWspro/bin/lint -fd -Nlevel=4 -errchk=longptr64 ${CFLAGS} `getconf LFS_CFLAGS` -o ${PROG} ${PROG}.c \ X `getconf LFS_LDFLAGS` `getconf LFS_LIBS` X X#man: man1 X# X#man1: ${MAN1}/${PROG}.1 X# ${CP} ${PROG}.1 ${MAN1} X Xshare: ${SHARE_FILE_NAME_ZIPPED} X X${SHARE_FILE_NAME_ZIPPED}: ${SHARE_FILES} X ${RM} -f ${SHARE_FILE_NAME} ${SHARE_FILE_NAME_ZIPPED} X ${SHAR} -a ${SHARE_FILES} > ${SHARE_FILE_NAME} X ${GZIP} ${SHARE_FILE_NAME} SHAR_EOF if test 1052 -ne "`wc -c < 'Makefile'`" then echo shar: error transmitting "'Makefile'" '(should have been 1052 characters)' fi fi # end of overwriting check echo shar: extracting "'team.c'" '(51880 characters)' if test -f 'team.c' then echo shar: will not over-write existing file "'team.c'" else sed 's/^ X//' << \SHAR_EOF > 'team.c' X#ifndef lint Xstatic char Notice[] = X "Copyright 1987,1989 Piercarlo Grandi. All rights reserved."; Xstatic char SCCS_ID[] = X "@(#) /u/des/src/team/team-solaris_2.6/team.c 1.11 99/07/29 17:27:36"; X#endif X X X/* X This program is free software; you can redistribute it and/or X modify it under the terms of the GNU General Public License as X published by the Free Software Foundation; either version 2, or X (at your option) any later version. X X This program is distributed in the hope that it will be useful, X but WITHOUT ANY WARRANTY; without even the implied warranty of X MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the X GNU General Public License for more details. X X You may have received a copy of the GNU General Public License X along with this program; if not, write to the Free Software X Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. X*/ X/* X * Note: Comments formatted like the one above are the originals by X * P.Grandi. Comments formatted like this one are for the most part X * added by D.Singer. X */ X X/* X * the '-d' (debug) option is only compiled in if this is defined X */ X#define DEBUG /**/ X X X/* X * Note: This version of the `team' program by Piercarlo Grandi X * has been severely hacked on by Daniel E. Singer , X * and includes many changes. Its basic functionality remains the X * same. The current version is tailored for Solaris 2.6, and will X * probably not compile on other systems. Since so many changes were X * needed, most of the "platform independent" provisions have been X * stripped out. X */ X X/* X UNIX programs normally do synchronous read and write, that is, X you read and then you write; no overlap is possible. X X This is especially catastrophic for device to device copies, X whereit is important to minimize elapsed time, by overlapping X activity on one with activity on another. X X To obtain this, a multiprocess structure is necessary under X UNIX. This program is functionally equivalento to a pipe, in X that it copies its input (fd 0) to its output (fd 1) link. X X This programs is executed as a Team of N processes, called X Guys, all of which share the same input and output links; the X first reads a chunk of input, awakens the second, writes the X chunk to its output; the second does the same, and the last X awakens the first. X X Since this process is essentially cyclic, we use a ring of X pipes to synchronize the Guys. Each guy has un input pipe from X the upstream guy and an output pipe to the downstream guy. X Whenever a guy receives a READ command from the upstream, it X first reads a block and then passes on the READ command X downstream; it then waits for a WRITE command from upstream, X and then writes the block and after that passes the WRITE X command downstream. A count of how much has been processed is X also passwd along, for statistics and verification. X X Two other commands are used, one is STOP, and is sent X downstream from the guy that detects the end of file of the X input, after which the guy exits, and ABORT, which is sent X downstream from the guy which detects trouble in the guy X upstream to it, which has much the same effect. X*/ X X X/* X * '-v' option messages will go to a special process if this is defined X */ X#define PMESG /**/ X X X/* int size for Big (64 bit) Files */ X#define bf_size u_longlong_t X X#define TeamLVOLSZ ((bf_size)1 << 10) /* Low volume size */ X#define TeamHVOLSZ ((bf_size)1 << 40) /* High volume size */ X#define TeamDVOLSZ TeamHVOLSZ /* Default volume size */ X X#define TeamLBUFSZ ((u_int)32) /* Low buffer size */ X#define TeamHBUFSZ ((u_int)64 << 20) /* High buffer size */ X#define TeamDBUFSZ ((u_int)10 << 10) /* Default buffer size */ X X#define TeamLTEAMSZ 2 /* Low # of processes */ X#define TeamHTEAMSZ 32 /* High # of processes */ X#define TeamDTEAMSZ 4 /* Default # of processes */ X X#define TeamLNBUFS 1 /* Low # of bufs per process */ X#define TeamHNBUFS 256 /* High # of bufs per process */ X#define TeamDNBUFS 1 /* Default # of bufs per process */ X X/* X External components... Probably the only system dependent part X of this program, as some systems have something in X /usr/include/sys where others have it in /usr/include. X X Also, the mesg() procedure is highly system dependent... watch X out for locking and variable number of arguments. X*/ X X#include X#include X#include X#include X#include X#include X X X#define reg register X#define bool short X#define TRUE 1 X#define FALSE 0 X/*#define nil(type) ((type) 0)*/ X#define MAX(A,B) ((A) >= (B) ? (A) : (B)) Xtypedef char* pointer; X X#define off_t longlong_t Xtypedef off_t address; X X# if (! __STDC__) X# define void int X# define const /* const */ X# endif X X#ifdef DEBUG Xstatic int debug = 0; X# define Mesg(list) { if (debug) { (void)sprintf list ; mesg(msgbuf); } } X#else X# define Mesg(list) ; X#endif X X Xstatic bool verbose = 0; /* 1 = verbose, 2 = verboser */ Xstatic bool report = FALSE; /* do report */ Xstatic bool guyhaderror = FALSE; /* error status */ Xstatic bool use_volumes = FALSE; /* do volumes */ Xstatic int num_bufs = TeamDNBUFS; /* num buffers per Guy */ X Xextern int errno; Xstatic time_t origin; X Xextern time_t time(time_t *); Xextern int atoi(const char *); Xextern char *malloc(unsigned); Xextern char *calloc(size_t,size_t); Xextern char *strchr(const char *,int); X Xextern char *optarg; Xextern int optind; X X#ifdef PMESG X static int pmsg[2]; /* file descriptors for PMESG */ X static pid_t ppid; /* pid of PMESG child process */ X#endif X X/* X * macros and data for 'Fd' (file descriptor) routines X */ X X#define FdCLOSED 0 X#define FdOPEN 1 X#define FdEOF 2 X#define FdERROR 3 X Xtypedef struct Fd Fd; Xstruct Fd X{ X int fd, /* this is an actual file descriptor number */ X status; /* current status of this I/O descriptor */ X bf_size size; /* logical volume size of the associated device X * or data stream */ X}; X Xstatic Fd X FdIn, /* Fd associated with stdin */ X FdOut; /* Fd associated with stdout */ X X/* X A Token is scalar value representing a command. X*/ X/* X * Note: these tokens are no longer used, and should be removed X * at some point; X * X * macros and data used for token passing on the interprocess ring; X * X * the READ and WRITE tokens could actually be combined to a single X * token, with the initial sender just sending out two of them, since X * a READ is *always* followed by a WRITE; the guy process could just X * keep a flip-flop value, and act approriately; X */ X X#if 0 X typedef char Token; X X# define TokenREAD 0 X# define TokenWRITE 1 X# define TokenSTOP 2 X# define TokenABORT 3 X X /* the order of these must match the definitions above! */ X static char X *tokenname[] = { "READ", "WRITE", "STOP", "ABORT" }; X#endif X X/* X * read/write state, used by Guy processes; X * the state just flip-flops after any data read or write; X */ X#define IO_READ 0 X#define IO_WRITE 1 X X/* X * statuses returned by 'RingReceive()' X */ X#define REC_OK 1 X#define REC_EOF 0 X#define REC_ERROR -1 X X/* X * these are the messages that the "Guys" pass to each other via X * the pipes to coordinate their actions; X */ Xtypedef struct RingMsg RingMsg; Xstruct RingMsg X{ X#if 0 X /* tokens not used anymore */ X Token token; /* action to take */ X#endif X bool volume; /* use volumes */ X bf_size done; /* amount read or written by all Guys so far */ X}; X X#ifdef PMESG X/* X * struct used for '-v' (verbose) option messages X */ Xtypedef struct pmsg_buf pmsg_buf; Xstruct pmsg_buf X{ X char pmb_type; /* the type, 'R'ead or 'W'rite */ X short pmb_num; /* the number of the Guy process */ X int pmb_recs; /* the number of records that were read or written */ X bf_size pmb_bytes; /* the number of bytes read or written */ X}; X#endif X X/* X * info for a sub-process X */ Xtypedef struct Guy Guy; Xstruct Guy X{ X short num; /* which Guy */ X pid_t pid; /* Process ID */ X int inRing, /* to read message ring */ X outRing; /* to write message ring */ X}; X X/* X * struct to hold top level info on a Team of Guy processes and X * associated data channels (pipes); X */ Xtypedef struct Team Team; Xstruct Team X{ X Guy *guys; /* pointer to an array of Guys */ X unsigned size; /* number of Guys to activate */ X unsigned active; /* number Guys spawned and started, X * will also count down when ending */ X}; X X/* X * buffers and data used mainly for messages X */ Xstatic Xchar msgbuf[1001], /* status message buffer */ X *mbuf, /* pointer into msgbuf */ X myname[21], /* process identifier, used in messages */ X *prog; /* program name, minus path */ Xstatic Xint mynum; /* leader = 0, guys = 1,2,... */ X X Xstatic void Xmesg( X char *m X) X{ X /* (void)fprintf(stderr,"%ld ",(long)time((time_t *)0)); */ X (void)fputs(m,stderr); X X /* keeps lint happy */ X return (0); X} X X X/* X * routine to print '-v' (verbose) status messages; X * if PMESG is defined, this writes a rec to the f.d. that will be X * read and reported by the special child process; X * otherwise, this is just a front end to 'mesg()'; X */ Xstatic void Xvmesg( X reg char type, X reg short num, X reg short recs, X bf_size bytes X) X{ X#ifdef PMESG X pmsg_buf buf; X X buf.pmb_type = type; X buf.pmb_num = num; X buf.pmb_recs = recs; X buf.pmb_bytes = bytes; X X (void)write(pmsg[1],&buf,(size_t)(sizeof buf)); X#else X reg char *typ = (type == 'R' ? "read" : "written"); X# ifdef DEBUG X reg char eol = (debug ? '\n' : '\r'); X# else X reg char eol = '\r'; X# endif X (void)sprintf(mbuf,"%lluk %-8s%c",bytes>>10,typ,eol); X mesg(msgbuf); X#endif X X /* keeps lint happy */ X return (0); X} X X X /************************** X **************************/ X X X/* X The regular UNIX read and write calls are not guaranteed to process X all the bytes requested. These procedures guarantee that if the X request is for N bytes, all of them are read or written, unless there X is an error or eof. X*/ X X/* X * simple initialization of an Fd struct; X * the fd will presumably be associated with a valid open file descriptor; X * returns 1 if this looks like a valid file descriptor; X */ Xstatic bool XFdOpen( X reg Fd *fd, X int ffd, X bf_size size X) X{ X fd->fd = ffd; X fd->status = (ffd >= 0) ? FdOPEN : FdCLOSED; X fd->size = size; X X Mesg((mbuf,"FdOpen fd %d\n",ffd)) X X return (ffd >= 0); X} X X/* X * close an Fd struct; X * guarantees a flush? X * returns 1 if close successful; X */ Xstatic bool XFdClose( X reg Fd *fd X) X{ X int ffd; /* holds on to fd->fd value */ X X ffd = fd->fd; X fd->fd = -1; X fd->status = FdCLOSED; X X Mesg((mbuf,"FdClose fd %d\n",ffd)) X X return (close(ffd) >= 0); X} X X#if 0 X/* X * make one Fd struct a `dup'licate of another; X * the file descriptors will be different; X * this should return (< 0) if `dup' fails; X */ Xstatic bool XFdCopy( X reg Fd *to, X reg Fd *from X) X{ X to->size = from->size; X to->status = from->status; X to->fd = dup(from->fd); X X Mesg((mbuf,"FdCopy of %d is %d\n",from->fd,to->fd)) X X return (to->fd >= 0); X} X#endif 1/0 X X/* X * make an exact copy of an Fd struct; X */ Xstatic void XFdSet( X reg Fd *to, X reg Fd *from X) X{ X if (from->fd < 0) { X (void)sprintf(mbuf,"set an invalid fd\n"); X mesg(msgbuf); X } X to->size = from->size; X to->status = from->status; X to->fd = from->fd; X X /* keeps lint happy */ X return (0); X} X X/* X * zero out an Fd struct; X */ Xstatic void XFdReset( X reg Fd *fd X) X{ X fd->fd = -1; X fd->status = FdCLOSED; X fd->size = 0; X X /* keeps lint happy */ X return (0); X} X X/* X * this routine can be called to attempt to resume copying if an X * error or the end of a volume is encountered; X * a "c" response resets the file pointer to the beginning of the volume; X * X * From the man page: X X c This means that the user wishes to continue, but the X file pointer shall be reset to the beginning of the X volume, as a new volume has been started anew. This is X the the answer to give on end of volume when writing to X multiple floppies, etc... X X y The user simply wishes to continue, without any change. X X n The user wishes to stop; the program will be terminated X or aborted. X */ Xstatic bf_size XFdRetry( X reg Fd *fd, X char *which, X bf_size done, X bf_size space, X bool *do_volumes X) X{ X int tty; X char reply[99]; X struct stat st; X char c; X X if (fstat(fd->fd,&st) < 0) { X perror(which); X return (0); X } X X /* X * we're only interested in data coming from a X * character or block device X * (for some reason...) X */ X st.st_mode &= S_IFMT; /* file type */ X if (st.st_mode != S_IFCHR && st.st_mode != S_IFBLK) X return (0); X X /* X * skip this if not talking to a terminal X */ X if (!isatty(fileno(stderr))) X return (0); X X if ((tty = open("/dev/tty",0)) < 0) { X perror("/dev/tty"); X return (0); X } X X do { X#if (defined i386 || defined sun) X/* this is the one being used... */ X extern char *(sys_errlist[]); X char *errmsg = sys_errlist[errno]; X#else X char errmsg[32]; X (void) sprintf(errmsg,"Error %d",errno); X#endif X if (verbose) { X (void)sprintf(mbuf,"\n"); X mesg(mbuf); X } X if (errno) { X (void)sprintf(mbuf,"'%s' on %s after %lluk. Continue [cyn] ? ",errmsg,which,done>>10); X mesg(mbuf); X } X else { X (void)sprintf(mbuf,"EOF on %s after %lluk. Continue [cyn] ? ",which,done>>10); X mesg(mbuf); X } X X reply[0] = '\0'; X (void) read((int)tty,(char*)&reply[0],(size_t)1); X for ( ; ; ) { X switch (read((int)tty,(char*)&reply[1],(size_t)1)) { X case 0: X case -1: X default: X reply[0] = 'n'; X break; X case 1: X if (reply[1] != '\n') X continue; X } X break; X } X X /*(void) read((int)tty,(char*)reply,(size_t)(sizeof reply));*/ X X } while (strchr("cCyYnN",reply[0]) == (char*)0); X X (void) close(tty); X X if (strchr("nN",reply[0]) != 0) { X *do_volumes = 0; X return (0); X } X X errno = 0; X X if (strchr("cC",reply[0]) != 0) { X (void) lseek((int)fd->fd,(off_t)0,(int)0); X return (fd->size); X } X X /* 'y' */ X return (space); X} X X/* X * returns the lesser of the two (hopefully positive) numbers X */ Xstatic size_t XFdCanDo( X reg bf_size buf_remain, /* space remaining in the buffer */ X reg bf_size vol_remain, /* space remaining in the volume */ X bool uv /* use volumes */ X) X{ X return ( (size_t)(uv ? (buf_remain <= vol_remain ? buf_remain : vol_remain) : buf_remain) ); X} X X/* X * tries to read in a complete buffer from the input; X * this routine will attempt to retry reading until a full buffer X * is read (this corresponds with the bufsize on the command line); X * retrying includes prompting the user to change volumes; X * returns number bytes read, or 0, or -1; X */ Xstatic ssize_t XFdRead( X reg Fd *fd, /* the data we're reading */ X pointer buffer, /* were to read into */ X size_t todo, /* buffer size we want to fill */ X size_t bsize, /* block size for reading */ X bf_size done, /* num bytes read of this volume so far */ X bool *do_volumes, /* whether or not to pay attention to volume X * boundaries */ X short guynum /* Guy number for verbose message */ X) X{ X reg bf_size space; /* num bytes remaining in this volume */ X reg size_t num2read; /* num bytes to read this read() */ X reg ssize_t bytesRead = 0; /* num bytes read this read() */ X reg size_t justDone = 0; /* num bytes read this call */ X X switch (fd->status) { X X case FdEOF: X return ( 0); X X case FdERROR: X return (-1); X X case FdCLOSED: X return (-1); X X case FdOPEN: X break; X X } X X /* possible space (num bytes) remaining in this volume */ X space = fd->size - (done % fd->size); X X while ( justDone < todo && (space != 0 || !*do_volumes) ) { X X num2read = (size_t)FdCanDo((bf_size)todo-justDone, X (bf_size)space-justDone,*do_volumes); X X if (num2read > bsize) X num2read = bsize; X X bytesRead = read((int)fd->fd,(char*)buffer+justDone, X (size_t)num2read); X X Mesg((mbuf,"FdRead %d, got %d\n",fd->fd,bytesRead)) X X if (verbose > 1 && bytesRead > (ssize_t)0) X vmesg('R',guynum,(int)1,done+bytesRead); X X if (!*do_volumes) { X if (bytesRead <= 0) X break; X justDone += bytesRead; X continue; X } X X if (bytesRead <= 0 || (justDone += bytesRead) == space) X space = FdRetry(fd,"input",(bf_size)done+justDone,(bf_size)space-justDone,do_volumes); X } X X if (bytesRead == 0) X fd->status = FdEOF; X else if (bytesRead < 0) X fd->status = FdERROR; X X Mesg((mbuf,"FdRead %d, reads %u, last %d\n",fd->fd,justDone,bytesRead)) X X return ( (justDone == 0) ? bytesRead : justDone ); X} X X/* X * tries to write out a complete buffer to the output; X * this routine will attempt to retry writing until a full buffer X * is written (this corresponds with the bufsize on the command line); X * retrying includes prompting the user to change volumes; X * returns number bytes written, or 0, or -1; X * X * Note: a probably bug here is that if an incomplete block is written X * to the end of a volume, the next volume will start with an incomplete X * block. This can be avoided by insuring that output volume size is an X * exact multiple of the output block size. X */ Xstatic ssize_t XFdWrite( X reg Fd *fd, /* the data we're writing */ X pointer buffer, /* were to read from */ X size_t todo, /* buffer size we want to copy */ X size_t blksize, /* max block size to write at one time */ X bf_size done, /* num bytes written to this volume so far */ X bool *do_volumes, /* whether or not to pay attention to X * volume boundaries */ X short guynum /* Guy number for verbose message */ X) X{ X reg bf_size space; /* num bytes remaining in this volume */ X reg size_t num2write; /* num bytes to write this write() */ X reg ssize_t bytesWritten = 0; /* num bytes written this write() */ X reg size_t justDone = 0; /* num bytes written this call */ X X switch (fd->status) { X X case FdEOF: X return ( 0); X X case FdERROR: X return (-1); X X case FdCLOSED: X return (-1); X X case FdOPEN: X break; X X } X X /* possible space (num bytes) remaining in this volume */ X space = fd->size - (done % fd->size); X X while (justDone < todo && (space != 0 || !*do_volumes) ) { X X num2write = (size_t)FdCanDo((bf_size)todo-justDone, X (bf_size)space-justDone,*do_volumes); X X if (num2write > blksize) X num2write = blksize; X X bytesWritten = write((int)fd->fd,(char*)buffer+justDone, X (size_t)num2write); X X Mesg((mbuf,"FdWrite %d, put %d\n",fd->fd,bytesWritten)) X X if (verbose > 1 && bytesWritten > (ssize_t)0) X vmesg('W',guynum,(int)1,done+bytesWritten); X X if (!*do_volumes) { X if (bytesWritten <= 0) X break; X justDone += bytesWritten; X continue; X } X X if (bytesWritten <= 0 || (justDone += bytesWritten) == space) X space = FdRetry(fd,"output",(bf_size)done+justDone,(bf_size)space-justDone,do_volumes); X } X X Mesg((mbuf,"FdWrite %d, writes %u, last %d\n",fd->fd,justDone,bytesWritten)) X X if (bytesWritten == 0) X fd->status = FdEOF; X else if (bytesWritten < 0) X fd->status = FdERROR; X X return ( (justDone == 0) ? bytesWritten : justDone ); X} X X X /************************** X **************************/ X X X/* X Here we represent Streams as Fds; this is is not entirely X appropriate, as Fds have also a volume size, and relatively X high overhead write and read functions. Well, we just take X some liberties with abstraction levels here. Actually we X should have an Fd abstraction for stream pipes and a Vol X abstraction for input and output... X*/ X/* X * Note: the above paragraph doesn't really apply any more... X * (ie, the Fd abstraction is no longer applied to the message ring. X * I found it a bit messy and confusing in that context. -des) X */ X X/* X * open a pipeline to be used to communicate between processes; X * returns TRUE if the pipe was opened successfully; X */ Xstatic bool XRingPipe( X int *inring, /* the read end of the pipe */ X int *outring /* the write end of the pipe */ X) X{ X int links[2]; X X Mesg((mbuf,"RingPipe creating pipe\n")) X X if (pipe(links) < 0) { X perror("team: RingPipe: error opening pipe"); X return (FALSE); X } X X *inring = links[0]; X *outring = links[1]; X X Mesg((mbuf,"RingPipe fd inring %d, outring %d\n",links[0],links[1])) X X return (TRUE); X} X X/* X * send a message down a pipe to another process; X * returns 1 if whole message sent successfully; X */ Xstatic bool XRingSend( X int fd, /* the pipe to write */ X#if 0 X Token token, /* action to send */ X#endif X bool volume, /* use volumes */ X bf_size done /* bytes read or written so far */ X) X{ X RingMsg message; /* message to send */ X reg int num; /* number of bytes written */ X X#if 0 X message.token = token; X#endif X message.volume = volume; X message.done = done; X X Mesg((mbuf,"RingSend fd %d, writing\n",fd)) X X num = write(fd,(pointer)&message,sizeof message); X X#if 0 X Mesg((mbuf,"RingSend fd %d, sent %d bytes, token %s (%d)\n",fd,num,tokenname[token],token)) X#else X Mesg((mbuf,"RingSend fd %d, sent %d bytes\n",fd,num)) X#endif X X return (num == sizeof message); X} X X/* X * get a message from a pipe between processes; X * returns the info to the data items pointed to by the args; X * returns 1 if a whole message was read; X * returns 0 on EOF; X * returns -1 on error; X */ Xstatic bool XRingReceive( X int fd, /* the pipe to write */ X bool *volume, /* use volumes */ X bf_size *donep /* bytes read or written so far */ X) X{ X RingMsg message; /* message read */ X reg int num; /* number of bytes read */ X X Mesg((mbuf,"RingReceive fd %d, reading\n",fd)) X X num = read(fd,(pointer)&message,sizeof message); X X *volume = message.volume; X *donep = message.done; X X Mesg((mbuf,"RingReceive fd %d, got %d bytes\n",fd,num)) X X /* got a message */ X if (num == sizeof message) X return (REC_OK); X /* got EOF */ X if (num == 0) X return (REC_EOF); X /* got munged */ X return (REC_ERROR); X} X X X /************************** X **************************/ X X X#ifdef PMESG X/* X * this will fork() a process that will then be used to collect '-v' X * (verbose) option messages from child processes, and display them X * in a more informative format; X */ Xint XRunPmesg() { X pmsg_buf buf; X /*static bf_size num_read = 0,*/ X bf_size num_read = 0, X num_writ = 0; X reg char eol = '\r'; X short last_read_num = 0, X last_writ_num = 0; X unsigned int seconds, X read_cnt = 0, X writ_cnt = 0, X read_blocks = 0, X writ_blocks = 0; X reg int n; X X /* set up the pipe */ X Mesg((mbuf,"RunPmesg creating pipe\n")) X if (pipe(pmsg) < 0) { X perror("team: RunPmesg: error opening pipe"); X return (FALSE); X } X X /* fork() off the child process */ X Mesg((mbuf,"RunPmesg forking process\n")) X ppid = (pid_t)fork(); X X /* parent? */ X if (ppid > (pid_t)0) { X Mesg((mbuf,"RunPmesg parent closing read end of pipe\n")) X (void)close(pmsg[0]); X return (TRUE); X } X X /* error? */ X if (ppid < (pid_t)0) { X perror("team: RunPmesg: error forking child process"); X return (FALSE); X } X X /* child */ X Mesg((mbuf,"RunPmesg child closing write end of pipe\n")) X (void)close(pmsg[1]); X X /* read/write loop, until no more data */ X#ifdef DEBUG X if (debug) X eol = '\n'; X#endif X Mesg((mbuf,"RunPmesg child beginning read/write loop\n")) X while ((n = (ssize_t)read(pmsg[0],&buf,(size_t)(sizeof buf))) > 0) { X Mesg((mbuf,"RunPmesg child %d read\n",n)) X if (buf.pmb_type == 'R') { X num_read = buf.pmb_bytes; X read_blocks += buf.pmb_recs; X if (last_read_num != buf.pmb_num) { X last_read_num = buf.pmb_num; X ++read_cnt; X } X } X else { X num_writ = buf.pmb_bytes; X writ_blocks += buf.pmb_recs; X if (last_writ_num != buf.pmb_num) { X last_writ_num = buf.pmb_num; X ++writ_cnt; X } X } X X seconds = time((time_t*)0) - origin; X if (seconds <= 0) X seconds = 1; X X /* the (') in the format is supposed to give thousands groupings ... */ X fprintf(stderr,"%'u/%'u %'llu in, %'u/%'u %'llu out, %'u secs, %'d KB/sec %c", X read_cnt,read_blocks,num_read,writ_cnt,writ_blocks,num_writ,seconds,(int)(((num_read+num_writ)>>11)/seconds),eol); X } X Mesg((mbuf,"RunPmesg child %d read\n",n)) X X /* child never returns */ X Mesg((mbuf,"RunPmesg child exiting\n")) X exit(0); X} X#endif X X X /************************** X **************************/ X X X/* X A guy is an instance of the input to output copier. It is attached X to a relay station, with an upstream link, from which commands X arrive, and a downward link, to which they are relayed once they are X executed. X*/ X X/* X * setup a Guy struct; X * assign the PID, and initial assignments for the message ring; X * returns 1 of both fd's look OK; X */ Xstatic bool XGuyInit( X reg X Guy *guy, /* Guy */ X short num, /* guy number: 1, 2, ... */ X pid_t pid, /* Process ID */ X int inring, /* read data channel */ X int outring /* write data channel */ X) X{ X Mesg((mbuf,"GuyInit pid %ld, inring %d, outring %d\n", X (long)pid,inring,outring)) X X guy->num = mynum; X guy->pid = pid; X guy->inRing = inring; X guy->outRing = outring; X X return (inring >= 0 && outring >= 0); X} X X/* X * how a Guy dies... X * the Guy process will not return from this routine; X */ XGuyNormalExit( X Guy* guy, X short status, X bool got_the_eof, X bf_size bytesProcessed X) X{ X /* X * close the WRITE end of the message ring pipe; X */ X Mesg((mbuf,"GuyNormalExit guy %d closing %d\n",mynum,guy->outRing)) X if (close(guy->outRing) != 0) X Mesg((mbuf,"GuyNormalExit guy %d problem closing %d\n",mynum,guy->outRing)) X X /* X * if this is the Guy that got the data EOF, X * wait for the other guys to die... X */ X if (got_the_eof) { X bool do_volumes; X bf_size bytesProcessed; X X /* X * when the previous guy dies (ie, the close() has worked X * its way around the ring), this read should return; X * we don't care about the exit status; X */ X Mesg((mbuf,"GuyNormalExit guy %u, waiting for inring close\n",mynum)) X (void)RingReceive(guy->inRing,&do_volumes,&bytesProcessed); X Mesg((mbuf,"GuyNormalExit guy %u, got inring close\n",mynum)) X } X X /* X * print a report X */ X if (got_the_eof) { X if (verbose) X (void)fprintf(stderr,"\n"); X if (report) { X bf_size kilobytes = (bytesProcessed + 512) >> 10; X bf_size seconds = time((time_t*)0) - origin; X double megabytes = (double)kilobytes / 1024; X int krate; /* rate, KB/sec */ X double mrate; /* rate, MB/sec */ X X if (seconds <= 0) X seconds = 1; X krate = kilobytes/seconds; X mrate = megabytes/seconds; X X /*(void)sprintf(mbuf,"%llu kilobytes, %llu seconds\n", */ X /* bytesProcessed>>10,(bf_size)( time((time_t*)0) - origin )); */ X /*(void)sprintf(mbuf, X "%llu bytes, %llu KB, %llu seconds, %d KB/sec\n", X bytesProcessed,kilobytes,seconds,krate);*/ X X (void)sprintf(mbuf, X "%llu bytes, %llu KB, %3.1f MB, %llu seconds, %d KB/sec, %4.2f MB/sec\n", X bytesProcessed,kilobytes,megabytes,seconds,krate,mrate); X X#ifdef DEBUG X if (debug) X mesg(msgbuf); X else X mesg(mbuf); X#else X mesg(mbuf); X#endif X } X#if 0 X else if (verbose) X (void)fprintf(stderr,"\n"); X#endif X } X X Mesg((mbuf,"GuyNormalExit guy %d, exiting with %d\n",mynum,status)) X exit (status); X} X X/* X * how a Guy dies abnormally... X */ XGuyAbnormalExit( X Guy* guy, X short status, X char* errmesg X) X{ X if (errmesg != (char *)0) { X (void)sprintf(mbuf,"guy pid %ld: %s\n",(long)guy->pid,errmesg); X#ifdef DEBUG X mesg(msgbuf); X#else X mesg(mbuf); X#endif X } X X GuyNormalExit(guy,status,0,(bf_size)0); X} X X/* X * a Guy will loop inside this routine reading and writing X * until its death; X */ Xstatic bool XGuyRun( X reg Guy *guy, /* read/write process identifier */ X int starter, /* start the initial messages in the ring? */ X char *buffer, /* the read/write data buffer */ X int numbufs, /* the number of buffers */ X size_t bufsize, /* I/O buffer size in bytes */ X /*size_t iblksize, /* input block size */ X size_t oblksize /* output block size */ X) X{ X bf_size bytesProcessed; /* num bytes transmitted thus far */ X bool do_volumes, /* volume flag, passed in messages */ X io_eof = 0; /* read data EOF */ X short receive_status, /* return from RingReceive() */ X io_state = IO_READ; /* read or write state flip-flop */ X ssize_t bytesRead, /* num bytes read into buffer */ X bytesWritten; /* num bytes written from buffer */ X /*boffset; /* record offset in buffer */ X size_t totalbufsize; /* bufsize * numbufs */ X int /*bufnum, /* counter for reading N buffers */ X numobufs; /* number of output buffers */ X X Mesg((mbuf,"GuyRun guy %u, bufsize %u, numbufs %u\n",mynum,bufsize,numbufs)) X X if (starter) { X /* X * if this is the last Guy created: X * start the token passing for reading and writing; X * the leader used to do this, but this might be safer; X */ X Mesg((mbuf,"GuyRun sending initial READ\n")) X if (!RingSend(guy->outRing,use_volumes,(bf_size)0)) X GuyAbnormalExit(guy,1,"guy cannot send initial READ"); X X Mesg((mbuf,"GuyRun sending initial WRITE\n")) X if (!RingSend(guy->outRing,use_volumes,(bf_size)0)) X GuyAbnormalExit(guy,1,"guy cannot send initial WRITE"); X } X X /* X * figure out total input size and number of output blocks X */ X totalbufsize = bufsize * numbufs; X if (bufsize != oblksize) { X if (oblksize > totalbufsize) { X totalbufsize = oblksize; X numobufs = 1; X } X else X numobufs = totalbufsize / oblksize; X } X else X numobufs = numbufs; X X /* X * read messages from the message ring associated with this Guy; X * this should continue until EOF, an error, or a STOP or ABORT X * from another Guy; X * X * for the most part, the message tokens should alternate between X * READ and WRITE; it should be possible to collapse these two into X * one token, and just have this routine flip-flop between read and X * write when it receives the token... X */ X for ( ; ; ) { X receive_status = RingReceive(guy->inRing,&do_volumes,&bytesProcessed); X if (receive_status == REC_ERROR) X GuyAbnormalExit(guy,1,"bad read on message ring"); X X if (receive_status == REC_EOF) X break; X X /* (receive_status == REC_OK) */ X X if (io_state == IO_READ) { X /* X * read in buffers of data from the data channel X */ X Mesg((mbuf,"GuyRun reading %u buffers, %u bufsize\n",numbufs,bufsize)) X Mesg((mbuf,"GuyRun reading %d bytes\n",totalbufsize)) X X bytesRead = FdRead(&FdIn,(pointer)buffer,(size_t)totalbufsize,(size_t)bufsize,(bf_size)bytesProcessed,&do_volumes,guy->num); X X Mesg((mbuf,"GuyRun reads %d bytes\n",bytesRead)) X X if (bytesRead < (ssize_t)0) { X Mesg((mbuf,"GuyRun error reading data\n")) X GuyAbnormalExit(guy,1,"bad read on data channel"); X } X X bytesProcessed += bytesRead; X X if (bytesRead < (ssize_t)totalbufsize) X io_eof = 1; X X if (verbose == 1 && bytesRead > (ssize_t)0) X vmesg('R',guy->num,(int)((bytesRead+(bufsize-1))/bufsize),bytesProcessed); X/*fprintf(stderr,"\n((%lu+(%u-1))/%u)",bytesRead,bufsize,bufsize);*/ X/*fprintf(stderr,"\nbufsize = %u\n",bufsize);*/ X } X else /* if (io_state == IO_WRITE) */ { X /* X * write out buffers of data to the data channel X */ X if (bytesRead > 0) { X Mesg((mbuf,"GuyRun writing %u buffers, %u bufsize\n",numobufs,bufsize)) X Mesg((mbuf,"GuyRun writing %d bytes\n",bytesRead)) X X bytesWritten = FdWrite(&FdOut,(pointer)buffer,(size_t)bytesRead,(size_t)oblksize,(bf_size)bytesProcessed,&do_volumes,guy->num); X X Mesg((mbuf,"GuyRun writes %d bytes\n",bytesWritten)); X X if (bytesWritten < (ssize_t)0) { X Mesg((mbuf,"GuyRun error writing data\n")) X GuyAbnormalExit(guy,1,"bad write on data channel"); X } X X bytesProcessed += bytesWritten; X X if (verbose == 1) X vmesg('W',guy->num,(int)((bytesWritten+(oblksize-1))/oblksize),bytesProcessed); X/*fprintf(stderr,"\n((%lu+(%u-1))/%u)",bytesWritten,oblksize,oblksize);*/ X/*fprintf(stderr,"\noblksize = %u\n",oblksize);*/ X X if (bytesWritten < bytesRead) { X Mesg((mbuf,"GuyRun error, did not write full data buffer\n")) X GuyAbnormalExit(guy,1,"incomplete write on data channel"); X } X } X if (io_eof) X break; X } X X if (!io_eof) { X /* X * now, pass on the IO token to the next Guy X */ X if (!RingSend(guy->outRing,do_volumes,bytesProcessed)) { X if (io_state == IO_READ) { X Mesg((mbuf,"GuyRun guy %d, cannot send READ message\n",mynum)) X }else { X Mesg((mbuf,"GuyRun guy %d, cannot send WRITE message\n",mynum)) X } X GuyAbnormalExit(guy,1,"error sending I/O message"); X } X } X X /* flip the state */ X io_state ^= 1; X } X X if (receive_status == REC_EOF) { X if (io_state == IO_WRITE) X GuyAbnormalExit(guy,1,"did not receive expected WRITE message"); X /* X * this is the normal exit for a Guy that was not the one X * to read the data EOF; X */ X GuyNormalExit(guy,0,0,(bf_size)0); X } X X /* X * this should be the Guy that got the data EOF X */ X GuyNormalExit(guy,0,1,bytesProcessed); X X /*NOTREACHED*/ X /*return (TRUE);*/ X} X X X /************************** X **************************/ X X X/* X A team is made up of a ring of guys; each guy copies a block from its X input to its ouput, and is driven by tokens sent to it on a pipe X by the previous guy in the ring. X*/ X X/* X Initialize the team by allocating the Guy structs X and setting the count; X */ Xstatic bool XTeamInit( X Team *team, /* the Team */ X unsigned num_guys /* the number of Guys in the Team */ X) X{ X Mesg((mbuf,"TeamInit num_guys %u\n",num_guys)) X X team->size = num_guys; X team->active = 0; X X team->guys = (Guy *)calloc((size_t)num_guys,(size_t)(sizeof (Guy))); X X if (team->guys == (Guy *)0) X return (FALSE); X X return (TRUE); X} X X/* X When generating each guy, we pass it an upstream link that X is the downstream from the previous guy, and create a new X downstream link that will be the next upstream. X X At each turn we obviously close the old downstream once it X has been passed to the forked guy. X X Special cases are the first and last guys; the upstream of X the first guy shall be the downstream of the last. This X goes against the grain of our main logic, where the X upstream is expected to already exist and the downstream X must be created. X X This means that the last and first guys are created in a X special way. When creating the first guy we shall create X its upstream link as well as its downstream, and we shall X save that in a special variable, last_downstream. This we X shall use as the downstream of the last guy. X X We shall also keep it open in the team manager (parent X process) because we shall use it to do the initial send of X the read and write tokens that will circulate in the relay X ring, activating the guys. X X Of course because of this each guy will inherit this link X as well as its upstream and downstream, but shall graciously X close it. X*/ X X/* X * this routine initializes the Guys, etc, and runs all of the I/O; X * returns TRUE on success; X */ Xstatic bool XTeamRun( X reg X Team *team, /* the Team */ X size_t iblksize, /* input block size */ X size_t oblksize, /* output block size */ X int numbufs /* output block size */ X#if 0 X bf_size Isize, /* input logical volume size in bytes */ X bf_size Osize /* output logical volume size in bytes */ X#endif X) X{ X int last_outring, /* this will be the outring link between X * the last Guy and the first Guy */ X next_inring, /* this will become my_inring for the Guy */ X my_inring, /* each Guy will read his own copy of this */ X my_outring; /* each Guy will write his own copy of this */ X char *Buffer; /* pointer to the main read/write data buffer */ X#if 0 X size_t bufsize = MAX(iblksize,oblksize); X#else X size_t bufsize = iblksize; X size_t maxbufsize = iblksize * numbufs; X#endif X X Mesg((mbuf,"TeamRun team %d, size %u, numbufs %u, bufsize %u, oblksize %u\n",0,team->size,numbufs,bufsize,oblksize)) X Mesg((mbuf,"TeamRun leader pid %ld\n",(long)getpid())) X X /* X * allocate space for the data buffer; X * each Guy will get its own copy via the fork(); X */ X if (oblksize > maxbufsize) X maxbufsize = oblksize; X /*Buffer = (pointer)calloc((size_t)numbufs,(size_t)bufsize);*/ X Buffer = (pointer)memalign((size_t)1024,(size_t)maxbufsize); X if (Buffer == (pointer)0) { X (void)sprintf(mbuf,"cannot allocate %u * %u bytes for the Buffers\n",maxbufsize,numbufs); X mesg(msgbuf); X return (FALSE); X } X X /* X * fork a process for each Guy in the Team; X * need to do alot of file descriptor shuffling to make a ring X * of pipes for them to communicate (pass tokens) on; X */ X for (team->active=0; team->active < team->size; ++team->active) { X reg Guy *guy; X reg pid_t pid; X X guy = &team->guys[team->active]; X X /* X * this is the first one, special case; X * open a pipe that the first Guy will read (my_inring), X * and keep the write side (last_outring) open for the X * final Guy to use as his my_outring (the Team leader X * will also write the initial tokens to this); X * for other Guys, next_inring gets converted to my_inring, X * see below; X */ X if (team->active == 0) { X if (!RingPipe(&my_inring,&last_outring)) { X perror("team: cannot open first ring pipe"); X return (FALSE); X } X } X X /* X * for all but the final Guy, this is the pipe that this Guy X * will write to, and the next one will read from; X */ X if (team->active < team->size - 1) { X if (!RingPipe(&next_inring,&my_outring)) { X perror("team: cannot open ring pipe"); X return (FALSE); X } X } X /* X * this is the last one, special case; X * the Guy will already have my_inring correctly set; we X * just need to set my_outring (a copy of last_outring); X */ X else /* (team->active == team->size - 1) */ { X my_outring = last_outring; X last_outring = -1; X } X X Mesg((mbuf,"TeamRun forking for guy %d\n",team->active + 1)) X X /* X * a Guy is born... X */ X pid = (pid_t)fork(); X X /* X * parent process X */ X if (pid > (pid_t)0) { X Mesg((mbuf,"TeamRun forked guy %d as pid %ld\n",team->active,(long)pid + 1)) X guy->pid = pid; X guy->num = team->active + 1; X X /* X * if that was not the final Guy, close the read side X * of the pipe from the previous Guy, and close the X * write side of the pipe to the new Guy; X * also, hold a copy of the read side of the pipe to X * to the next Guy where he'll be expecting it; X */ X if (team->active < team->size - 1) { X if (close(my_inring) != 0) X perror("team: cannot close this inring link"); X my_inring = -1; X X if (close(my_outring) != 0) X perror("team: cannot close this outring link"); X my_outring = -1; X X my_inring = next_inring; X next_inring = -1; X } X /* X * else, if that was the final Guy, close the read side X * of the pipe to the Guy; X */ X else { X if (close(my_inring) != 0) X perror("team: cannot close this inring link"); X my_inring = -1; X } X } X /* X * child process X */ X else if (pid == (pid_t)0) { X pid = (pid_t)getpid(); X X /* for messages */ X mynum = team->active + 1; X#ifdef DEBUG X if (debug) { X /*(void)sprintf(myname,"guy%d",team->active);*/ X (void)sprintf(myname,"guy%d",mynum); X (void)sprintf(msgbuf,"%s: ",myname); X mbuf = &msgbuf[strlen(msgbuf)]; X } X#endif X X#if 0 X /* Set SIGPIPE handling back to the default in the guys */ X /*signal(SIGPIPE, SIG_DFL);*/ X (void)sigrelse(SIGPIPE); X#endif X /* X * if this is not the final Guy, close the write side X * going to the first pipe to the first Guy, and X * close the read side of the pipe that goes to the X * next Guy; X */ X if (team->active < team->size - 1) { X if (close(last_outring) != 0) X perror("team: Guy cannot close last_outring"); X last_outring = -1; X X if (close(next_inring) != 0) X perror("team: Guy cannot close next_inring"); X next_inring = -1; X } X /* X * else, if this is the final Guy, he's already got X * exactly what he needs: the read end of a pipe from X * the prev. Guy, and the write end of a pipe to the X * first Guy; X */ X else /* (team->active == team->size - 1) */ { X /*EMPTY*/ X } X X if (!GuyInit(guy,mynum,pid,my_inring,my_outring)) X GuyAbnormalExit(guy,1,"cannot init guy"); X X /* should never return from GuyRun */ X#if 0 X if (!GuyRun(guy,(mynum == team->size),Buffer,numbufs,bufsize,iblksize,oblksize)) X#else X if (!GuyRun(guy,(mynum == team->size),Buffer,numbufs,bufsize,oblksize)) X#endif X GuyAbnormalExit(guy,1,"cannot run guy"); X X /* X * shouldn't get here X */ X GuyAbnormalExit(guy,1,"how did I get here?"); X X exit(1); X X /*NOTREACHED*/ X } X /* X * Oops! X */ X else if (pid < (pid_t)0) { X perror("team: problem forking"); X return (FALSE); X } X } X X /* X * only parent gets to this point X */ X X if (close(my_outring) != 0) X perror("team: cannot close first link"); X my_outring = -1; X X#ifdef PMESG X /* X * close on the pipe to the PMESG process X */ X if (verbose) X (void)close(pmsg[1]); X#endif X X return (TRUE); X} X X/* X * wait for each of the Guy processes to die off; X * returns FALSE if any of them died badly; X */ Xstatic bool XTeamWait( X reg Team *team X) X{ X pid_t guypid; /* pid of a child process */ X int status; /* return status of a child process */ X reg unsigned guyno; /* used to index loop */ X int rval = TRUE; /* return value */ X X for ( ; ; ) { X#ifdef PMESG X /* might have the PMESG child process, too */ X if (verbose && team->active == 0) { X Mesg((mbuf,"TeamWait done waiting\n")) X break; X } X#endif X status = 0; X Mesg((mbuf,"TeamWait waiting, %d active\n",team->active)) X guypid = (pid_t)wait(&status); X Mesg((mbuf,"TeamWait got pid %ld, status %d\n",(long)guypid,status)) X X if (guypid == (pid_t)-1) { X /* wait was interrupted */ X if (errno == EINTR) X continue; X if (team->active != 0) { X (void)sprintf(mbuf,"no guys remaining, believed %u left\n",team->active); X mesg(msgbuf); X } X return(rval); X } X X /* child was stopped or continued */ X if (WIFSTOPPED(status) || WIFCONTINUED(status)) X continue; X#ifdef PMESG X if (verbose && guypid == ppid) { X Mesg((mbuf,"TeamWait pid %ld was PMESG process\n",(long)guypid)) X continue; X } X#endif X X /* mark the Guy that died */ X for (guyno=0; guyno < team->size; ++guyno) X if (guypid == team->guys[guyno].pid) { X team->guys[guyno].pid = (pid_t)0; X --team->active; X break; X } X if (guyno == team->size) X continue; X X X /* child had non-zero exit */ X if (WEXITSTATUS(status)) { X guyhaderror = TRUE; X rval = FALSE; X continue; X } X X /* child died from a signal */ X if (WIFSIGNALED(status)) { X rval = FALSE; X continue; X } X } X} X X/* X * kill off any remaining child (guy) processes; X * returns 1 if all child processes seem accounted for; X */ Xstatic bool XTeamStop( X reg Team *team X) X{ X reg unsigned guyno; X X Mesg((mbuf,"TeamStop team %d\n",0 /*team*/)) X X for (guyno=0; guyno < team->size; ++guyno) { X reg Guy *guy; X X guy = &team->guys[guyno]; X X if (guy->pid > (pid_t)0) { X X (void)kill(guy->pid,SIGKILL); X X --team->active; X } X } X X return (team->active == 0); X} X X/* X * resets the Team struct, frees allocated data; X * this doesn't really do anything useful, and should be reworked; X * always returns TRUE; X */ Xstatic bool XTeamRelease( X reg Team *team X) X{ X team->active = 0; X team->size = 0; X (void)free(team->guys); X team->guys = (Guy*)0; X X return (TRUE); X} X X X /************************** X **************************/ X X X/* X * print Usage message and exit with status; X */ Xstatic void Xusage( X int status /* exit status */ X) X{ X FILE *ostream = (status ? stderr : stdout); X X if (!status) X (void)fprintf(ostream,"\ X\n\ X%s:\n\ X\n\ X`%s' is a program intended to speed data copying -- possibly to or\n\ Xfrom some device -- by parallelizing reads and writes, thus increasing\n\ Xthroughput. You can specify via the command line the number of processes\n\ Xto use (each provides one or more read/write buffers), and also the buffer\n\ Xsize to use (which will also be used as the default input and output block\n\ Xsize) and the number of buffers per process. If an output block size is\n\ Xspecified, it must be a factor of the product of the buffer size and the\n\ Xnumber of buffers per process, or it must be a multiple of the product.\n\ X\n\ X`%s' also supports the concept of volume sizes, and will prompt for an\n\ Xaction when a volume size is reached. This allows you to, for example,\n\ Xcopy to or from multiple fixed-size diskettes. This feature has not been\n\ Xas thoroughly tested as the rest of the code. Responses to the volume\n\ Xprompt are one of: `c', continue, resetting the pointer to the beginning\n\ Xof the new volume; `y', just continue; and `n', terminate the program.\n\ XVolume management will only be invoked if one of `-m', `-I', or `-O' are\n\ Xspecified.\n\ X", X prog, X prog, X prog X ); X (void)fprintf(ostream,"\ X\n\ XUsage: %s [-%svrmh] [-o num[X]] [-I num[X]] [-O num[X]] \\\n\ X [ bufsiz[X] [ procs [bufs] ] ]\n\ X\n\ X", X prog, X#ifdef DEBUG X "d" X#else X "" X#endif X ); X X X if (!status) X (void)fprintf(ostream,"\ X Copy standard input to standard output with multi-process buffering;\n\ X\n\ X%s -v verbose, report ongoing status (use twice for verboser);\n\ X -r summary report;\n\ X -m use volume management;\n\ X -h print help info and exit;\n\ X -o output block size (default is bufsiz);\n\ X -I input volume size (default is %lluGB);\n\ X -O output volume size (default is %lluGB);\n\ X bufsiz buffer size (default is %uKB);\n\ X procs number of processes (default is %u);\n\ X bufs number of buffers per process (default is %u);\n\ X\n\ X", X#ifdef DEBUG X " -d debug, lots of messages;\n", X#else X "", X#endif X TeamDVOLSZ>>30, X TeamDVOLSZ>>30, X TeamDBUFSZ>>10, X TeamDTEAMSZ, X TeamDNBUFS X ); X if (!status) X (void)fprintf(ostream,"\ X Factor X can be one of: b for *512, k for *1KB, m for *1MB, g for *1GB.\n\ X" X ); X if (status) X (void)fprintf(ostream,"\ X %s -h (for help info)\n\ X", X prog X ); X#if 0 X if (!status) X (void)fprintf(ostream,"\ X Of course, use figures that are appropriate for your system.\n\ X" X ); X#endif X (void)fprintf(ostream,"\ X\n\ X" X ); X X exit(status); X /*NOTREACHED*/ X X /* keeps lint happy */ X return (0); X} X X/* X * convert a numeric string to a bf_size integer; X * adjust for scale suffix; X */ Xstatic bf_size Xatos( X reg char *p X) X{ X reg bf_size num = 0; X reg char c; X X for ( ; *p >= '0' && *p <= '9'; ++p) X num = num * 10 + (*p - '0'); X X c = *p | '\040'; /* allow for upper or lower case */ X X if (c == 'b') num <<= 9; /* block */ X else if (c == 'k') num <<= 10; /* kilobyte */ X else if (c == 'm') num <<= 20; /* megabyte */ X else if (c == 'g') num <<= 30; /* gigabyte */ X X return (num); X} X X/* X * return the component after the last slash in the path string; X */ Xstatic char* Xbasename(path) Xregister char X *path; X{ X register char X *p = path; X X for ( ; *p != '\0'; ++p) X if (*p == '/') X path = p + 1; X X return(path); X} X X X /************************** X **************************/ X X Xextern int Xmain( X int argc, X char *argv[] X) X{ X Team team; /* the top level data struct for processes X * and data channels (pipes) */ X unsigned teamsize; /* number of Guy processes to spawn */ X size_t bufsize; /* I/O buffer size in bytes */ X size_t isize = 0; /* input block size in bytes */ X size_t osize = 0; /* output block size in bytes */ X bf_size Isize; /* input logical volume size in bytes */ X bf_size Osize; /* output logical volume size in bytes */ X int opt; /* used for option processing */ X#ifdef DEBUG X char* optstr = "dvrmho:I:O:"; /* command line options */ X#else X char* optstr = "vrmho:I:O:"; X#endif X short oflag = 0, X Iflag = 0, X Oflag = 0; X X prog = basename(argv[0]); X X /* X * set the defaults X */ X teamsize = TeamDTEAMSZ; X bufsize = TeamDBUFSZ; X Isize = TeamDVOLSZ; X Osize = TeamDVOLSZ; X X /* X * set up for messages X */ X mynum = 0; X /*(void)strcpy(myname,"leader");*/ X (void)strcpy(myname,prog); X (void)sprintf(msgbuf,"%s: ",myname); X mbuf = &msgbuf[strlen(msgbuf)]; X X /* X * process command line options X */ X while (( opt = getopt(argc,argv,optstr) ) != -1) { X X switch (opt) { X case 'd': X debug ^= 1; X break; X X case 'v': X /*verbose ^= 1;*/ X ++verbose; X break; X X case 'r': X report ^= 1; X break; X X case 'm': X use_volumes ^= 1; X break; X X case 'h': X usage(0); X X case 'o': X osize = atos(optarg); X if (osize <= (size_t)0) { X (void)fprintf(stderr,"team: invalid output block size %llu\n",osize); X usage(1); X } X oflag = 1; X break; X X case 'I': X Isize = atos(optarg); X if (Isize < TeamLVOLSZ || Isize > TeamHVOLSZ) { X (void)fprintf(stderr,"team: invalid input volume size %llu\n",Isize); X usage(1); X } X Iflag = 1; X break; X X case 'O': X Osize = atos(optarg); X if (Osize < TeamLVOLSZ || Osize > TeamHVOLSZ) { X (void)fprintf(stderr,"team: invalid output volume size %llu\n",Osize); X usage(1); X } X Oflag = 1; X break; X X default: X /*(void)fprintf(stderr,"team: unknown option \"%c\"\n",opt);*/ X usage(1); X } X } X argc -= optind; X argv += optind; X X /* X * command line arguments X */ X X /* X * I/O buffer size X */ X if (argc != 0) { X bf_size tbufsize; /* temp holder for bufsize */ X X /* tbufsize can hold a bigger number than bufsize */ X tbufsize = atos(argv[0]); X Mesg((mbuf,"bufsize = %lld\n",tbufsize)) X if (tbufsize < TeamLBUFSZ || tbufsize > TeamHBUFSZ) { X (void)fprintf(stderr,"team: invalid block size %llu\n", X tbufsize); X usage(1); X } X bufsize = tbufsize; X --argc, ++argv; X } X#if 0 X(void)fprintf(stderr,"address bytes = %d\n",sizeof(address)); X(void)fprintf(stderr,"bf_size bytes = %d\n",sizeof(bf_size)); Xexit(0); X#endif X X /* X * number of processes X */ X if (argc != 0) { X teamsize = atoi(argv[0]); X if (teamsize < TeamLTEAMSZ || teamsize > TeamHTEAMSZ) { X (void)fprintf(stderr,"team: invalid number of processes %d\n",teamsize); X usage(1); X } X --argc, ++argv; X } X X /* X * number of buffers per process X */ X if (argc != 0) { X num_bufs = atoi(argv[0]); X if (num_bufs < TeamLNBUFS || num_bufs > TeamHNBUFS) { X (void)fprintf(stderr,"team: invalid number of buffers %d\n",num_bufs); X usage(1); X } X --argc, ++argv; X } X X if (argc != 0) { X (void)fprintf(stderr,"team: too many operands\n"); X usage(1); X } X X if (Iflag || Oflag) X use_volumes = TRUE; X X if (oflag) { X size_t remain; X size_t tbufsize = bufsize * num_bufs; X X if (tbufsize >= osize) X remain = tbufsize % osize; X else X remain = osize % tbufsize; X if (remain) { X (void)fprintf(stderr,"team: output block size must be a multiple of bufsize\n"); X usage(1); X } X } X else X osize = bufsize; X isize = bufsize; X X /* X * initialize the primary I/O channels: we read stdin, write stdout X */ X if (!FdOpen(&FdIn, 0,Isize)) { X (void)sprintf(mbuf,"problem opening stdin\n"); X mesg(msgbuf); X return (1); X }; X if (!FdOpen(&FdOut,1,Osize)) { X (void)sprintf(mbuf,"problem opening stdout\n"); X mesg(msgbuf); X return (1); X }; X X /* X * get the starting time X */ X origin = time((time_t *) 0); X X#ifdef PMESG X /* X * fork() child process to read and report '-v' (verbose) messages X */ X if (verbose) X if (!RunPmesg()) { X (void)sprintf(mbuf,"could not run pmesg process\n"); X mesg(msgbuf); X return (1); X }; X#endif X X /* X * initialize the Team struct X */ X if (!TeamInit(&team,teamsize)) { X (void)sprintf(mbuf,"cannot setup the team with %d guys\n",teamsize); X mesg(msgbuf); X return (1); X } X X#if 0 X /* X * Ignore SIGPIPE in the parent as it affects the exit status reporting. X */ X /*signal(SIGPIPE, SIG_IGN);*/ X (void)sigignore(SIGPIPE); X#endif X X /* X * this will further initialize structs, and do the entire copy; X */ X#if 0 X if (!TeamRun(&team,bufsize,Isize,Osize)) { X#else X if (!TeamRun(&team,isize,osize,num_bufs)) { X#endif X (void)sprintf(mbuf,"cannot start the team\n"); X mesg(msgbuf); X return (1); X } X X /* X * wait for each of the Guy processes to die off; X */ X if (!TeamWait(&team)) { X (void)sprintf(mbuf,"stop remaining %u guys\n",team.active); X mesg(msgbuf); X X /* X * try to kill off any remaining Guy processes; X */ X if (!TeamStop(&team)) { X (void)sprintf(mbuf,"cannot stop the team\n"); X mesg(msgbuf); X return (1); X } X } X X /* X * reset the Team struct; X */ X if (!TeamRelease(&team)) { X (void)sprintf(mbuf,"cannot release the team\n"); X mesg(msgbuf); X return (1); X } X X if (guyhaderror) { X (void)sprintf(mbuf,"guy had error\n"); X mesg(msgbuf); X return (1); X } X X /* X * close the primary I/O channels X */ X (void)FdClose(&FdIn); X (void)FdClose(&FdOut); X X Mesg((mbuf,"normal exit")) X return (0); X} SHAR_EOF if test 51880 -ne "`wc -c < 'team.c'`" then echo shar: error transmitting "'team.c'" '(should have been 51880 characters)' fi fi # end of overwriting check echo shar: extracting "'team.1.dist'" '(5406 characters)' if test -f 'team.1.dist' then echo shar: will not over-write existing file "'team.1.dist'" else sed 's/^ X//' << \SHAR_EOF > 'team.1.dist' X'\" Copyright 1987,1989 Piercarlo Grandi. All rights reserved. X.TH TEAM 1 (pg) X.SH NAME Xteam \- parallel "pipe", allows asynchronous io X.SH SYNOPSIS X.B team X.RB [ -r ] X.RB [ -v ] X.RB [ -i X.I volsize X.RB [ b | k | m ]] X.RB [ -o X.I volsize X.RB [ b | k | m ]] X.RI [ blocksize X.R [ X.RB [ b | k | m ] X.RB [ processes ]] X.SH DESCRIPTION X.I Team Xjust copies its standard input to its standard output. It does so Xhowever forking a team of independent X.I processes X(default is 4), arranged in a ring, with reads overlapped Xwith writes. X.LP XEach process will wait for the end of the read phase of Xprevious process, will then read X.I blocksize Xbytes (or 512 byte blocks if suffixed with X.B b Xor kilobytes if suffixed with X.BR k , Xor megabytes if suffixed with X.BR m ; Xthe default is X.BR 20k ) Xfrom its standard input, activate the next process read Xphase, wait for the previous process write phase end, then Xwrite to its standard output, and activate the next process Xwrite phase. X.LP XIf the input or output volume ends or an IO error is detected, X.I team Xwill ask wehtether the user wants to continue (but only if the Xconcerned volume is a block or character device and the program's Xstandard error channel is a tty device). Possible answers are X.TP X.B c XThis means that the user wishes to continue, but the file pointer shall be Xreset to the beginning of the volume, as a new volume has been started Xanew. This is the the answer to give on end of volume when writing to Xmultiple floppies, etc... X.TP X.B y XThe user simply wishes to continue, without any change. X.TP X.B n XThe user wishes to stop; the program will be terminated or aborted. X.LP XThere are just three options, as follows: X.TP X.B -i XThe value that follows the option is the assumed volume size of the input, Xand it is expressed in the same units as the buffer size. X.TP X.B -o XThe value that follows the option is the assumed volume size of the output, Xand it is expressed in the same units as the buffer size. X.TP X.B -v XIf specified as each buffer is read or written the total number of kilobytes Xread or written that far is printed. X.TP X.B -r XIf specified the number of kilobytes processed and the number of seconds Xtaken is X.B not Xprinted at the end of the run. X.LP X.I Team Xconsumes system time to synchronize and task switch among Xits processes; also, in order to avoid slowing it, it is Xbest run on a quiescent system. X.LP XThis program is most useful for output to a device, especially Xwhere a streaming tape is involved. It may be used to advantage with Xdisc to disc and disc to tape copies. X.SH EXAMPLES Xfind dir -print | cpio -oBc | team 20k 8 >/dev/rmt0 X.br Xteam 20k 4 /dev/rdsk/f0t X.SH ADVICE XYou are advised to experiment with different combinations of block size and Xnumber of processes; each program used with X.I team Xworks best with certain parameters, and performance depends even more Xstrongly on the output device, so experiment with parameters also for Xthis (it seems that the blocking factor of the process that feeds X.I team Xought to be inferior to that given to it, and possibly inferior to the Xlimit on the size of a pipe for your version of the system). X.I Team Xought to be adaptive, and adjust dynamically both parameters, in order to Xreach a state where there is no pause between each stage of the ring. This is Xtoo difficult to achieve under UNIX. X.LP XNotice also that this program will read and write blocks all of the Xsame size as prescribed, except the last, even when reading from Xpipes; if a read from its input supplies less bytes than the prescribed Xblock size, this program will read again until its buffer is filled Xto norm or the input finishes. X.LP XA final note: it is usually advantageous to give to X.I team Xa block size that is a multiple of the block size produced by the Xprogram before it in a pipeline. Notice that in many cases, such Xas the tape archival programs, the output will not be directly Xrecognizable to the tape archiver in input, but will have to be reblocked Xback to the blocksize expected by the tape archiver either by way Xof X.I dd Xor reapplication of X.IR team , Xthat is much faster of course. X.SH BUGS X.I Team Xwill emit a number of messages comprehensible only to the author in case Xof errors. Plase note them and report them to the author. X.LP XThis is not strictly a bug in this program, but rather a limitation; some Xdevice drivers will have problems when you change volume when this program Xasks you whether to continue operation. They require that the device be Xclosed and opened again whenever a volume is changed. Unfortunately this Xcannot be done, given the structure of X.IR team ; Xwith such device drivers you effectively cannot use team to write multiple Xvolumes. X.LP XSome device drivers, on physical end of file or volume while writing do Xnot do the decent thing, and write a legible truncated block and return Xits length; some drivers, e.g. some tape drivers, handle physical eof Xon write quite badly. With these drivers you had better use the X.B -o Xoption to set a logical EOF if you want to use multiple volumes. As an hint, Xgive the volume size as about five percent less than the nominal. Whatever Xvalue you use for output, take not of it, as you will have to use exactly the Xsame value for input! X.SH SEE ALSO X.IR volcopy (8) X.br X.IR cpio (1) X.br X.IR tar (1) X.br X.IR dump (8) X.SH AUTHOR XPiercarlo Grandi, Milano. SHAR_EOF if test 5406 -ne "`wc -c < 'team.1.dist'`" then echo shar: error transmitting "'team.1.dist'" '(should have been 5406 characters)' fi fi # end of overwriting check echo shar: extracting "'team.c.dist'" '(23313 characters)' if test -f 'team.c.dist' then echo shar: will not over-write existing file "'team.c.dist'" else sed 's/^ X//' << \SHAR_EOF > 'team.c.dist' X/* X $Header: /fs/mumon/aware/piercarl/Cmd./Commands./team.c,v 3.1 1994/01/13 15:03:25 piercarl Exp piercarl $ X*/ X Xstatic char Notice[] = X "Copyright 1987,1989 Piercarlo Grandi. All rights reserved."; X X/* X This program is free software; you can redistribute it and/or X modify it under the terms of the GNU General Public License as X published by the Free Software Foundation; either version 2, or X (at your option) any later version. X X This program is distributed in the hope that it will be useful, X but WITHOUT ANY WARRANTY; without even the implied warranty of X MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the X GNU General Public License for more details. X X You may have received a copy of the GNU General Public License X along with this program; if not, write to the Free Software X Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. X*/ X X#undef DEBUG X X/* X Unix programs normally do synchronous read and write, that is, X you read and then you write; no overlap is possible. X X This is especially catastrophic for device to device copies, X whereit is important to minimize elapsed time, by overlapping X activity on one with activity on another. X X To obtain this, a multiprocess structure is necessary under X Unix. This program is functionally equivalento to a pipe, in X that it copies its input (fd 0) to its output (fd 1) link. X X This programs is executed as a Team of N processes, called X Guys, all of which share the same input and output links; the X first reads a chunk of input, awakens the second, writes the X chunk to its output; the second does the same, and the last X awakens the first. X X Since this process is essentially cyclic, we use a ring of X pipes to synchronize the Guys. Each guy has un input pipe from X the upstream guy and an output pipe to the downstream guy. X Whenever a guy receives a READ command from the upstream, it X first reads a block and then passes on the READ command X downstream; it then waits for a WRITE command from upstream, X and then writes the block and after that passes the WRITE X command downstream. A count of how much has been processed is X also passwd along, for statistics and verification. X X Two other commands are used, one is STOP, and is sent X downstream from the guy that detects the end of file of the X input, after which the guy exits, and ABORT, which is sent X downstream from the guy which detects trouble in the guy X upstream to it, which has much the same effect. X*/ X X#define TeamLVOLSZ (1L<<10) X#define TeamHVOLSZ ((long unsigned) 3 * ((long unsigned) 1 << 30)) X X#define TeamLBUFSZ (64) /* Low buffer size */ X#define TeamDBUFSZ (60*512) /* Default buffer size */ X#define TeamHBUFSZ (1L<<20) /* High buffer size */ X X#define TeamDTEAMSZ 4 /* Default # of processes */ X#define TeamHTEAMSZ 16 /* High # of processes */ X X/* X External components... Probably the only system dependent part X of this program, as some systems have something in X /usr/include/sys where others have it in /usr/include. X X Also, the mesg() procedure is highly system dependent... watch X out for locking and variable number of arguments. X*/ X X#include X#include X#include X#include X#include X#include X#include X X#ifdef sun X# undef F_SETLKW X#endif X X#if (PCG) X# include "Extend.h" X# include "Here.h" X# include "Type.h" X#else X# define call (void) X# define were if X# define fast register X# define global /* extern */ X# define local static X# define when break; case X# define otherwise break; default X# define mode(which,name) typedef which name name; which name X# define bool int X# define true 1 X# define false 0 X# define nil(type) ((type) 0) X# define scalar int X typedef char *pointer; X# if (defined(SMALL_M)) X typedef unsigned address; X# else X typedef long address; X# endif X# if (__STDC__) X# define of(list) list X# define on(list) X# define is(list) (list) X# define _ , X# define noparms void X# else X# define void int X# define const /* const */ X# define of(list) () X# define on(list) list X# define is(list) list; X# define _ ; X# define noparms X# endif X#endif X X#ifdef DEBUG X# define Mesg(list) mesg list X#else X# define Mesg(list) X#endif X X/*VARARGS1*/ Xmesg(a,b,c,d,e,f,g,h,i) X char *a; X int b,c,d,e,f,g,h,i; X{ X# if (defined F_SETLKW) X struct flock l; X l.l_whence = 0; l.l_start = 0L; l.l_len = 0L; X l.l_type = F_WRLCK; fcntl(fileno(stderr),F_SETLKW,&l); X# endif X# if (defined LOCK_EX) X flock(fileno(stderr),LOCK_EX); X# endif X fprintf(stderr,a,b,c,d,e,f,g,h,i); X# if (defined LOCK_EX) X flock(fileno(stderr),LOCK_UN); X# endif X# if (defined F_SETLKW) X l.l_type = F_UNLCK; fcntl(fileno(stderr),F_SETLKW,&l); X# endif X} X Xlocal bool verbose = false; Xlocal bool report = true; X Xextern int errno; Xlocal time_t origin; X Xextern time_t time of((time_t *)); Xextern int atoi of((const char *)); Xextern char *malloc of((unsigned)); Xextern char *calloc of((address,unsigned)); Xextern char *strchr of((const char *,int)); X Xextern int getopt of((int,char *[],const char *)); Xextern char *optarg; Xextern int optind; X X/* X The regular Unix read and write calls are not guaranteed to process X all the bytes requested. These procedures guarantee that if the X request is for N bytes, all of them are read or written unless there X is an error or eof. X*/ X X#define FdCLOSED 0 X#define FdOPEN 1 X#define FdEOF 2 X#define FdERROR 3 X Xmode(struct,Fd) X{ X int fd; X short status; X long unsigned size; X}; X Xlocal Fd FdIn,FdOut; X Xlocal bool FdOpen on((fd,ffd,size)) is X( X fast Fd *fd X_ int ffd X_ long unsigned size X) X{ X fd->status = (ffd >= 0) ? FdOPEN : FdCLOSED; X fd->fd = ffd; X fd->size = size; X X Mesg(("FdOpen fd %d\n",ffd)); X X return ffd >= 0; X} X Xlocal bool FdClose on((fd)) is X( X fast Fd *fd X) X{ X int ffd; X X ffd = fd->fd; X Mesg(("FdClose fd %d\n",fd->fd)); X fd->status = FdCLOSED; X fd->fd = -1; X X return close(ffd) >= 0; X} X Xlocal bool FdCopy on((to,from)) is X( X fast Fd *to X_ fast Fd *from X) X{ X to->size = from->size; X to->status = from->status; X to->fd = dup(from->fd); X Mesg(("FdCopy of %d is %d\n",from->fd,to->fd)); X return to->fd >= 0; X} X Xlocal void FdSet on((to,from)) is X( X fast Fd *to X_ fast Fd *from X) X{ X if (from->fd < 0) X mesg("team: set an invalid fd\n"); X to->size = from->size; X to->status = from->status; X to->fd = from->fd; X} X Xlocal long unsigned FdRetry on((fd,which,done,space)) is X( X fast Fd *fd X_ char *which X_ long unsigned done X_ long unsigned space X) X{ X int tty; X char reply[2]; X struct stat st; X X if (fstat(fd->fd,&st) < 0) X { X perror(which); X return 0; X } X X st.st_mode &= S_IFMT; X if (st.st_mode != S_IFCHR && st.st_mode != S_IFBLK) X return 0; X X if (!isatty(fileno(stderr))) X return 0; X X if ((tty = open("/dev/tty",0)) < 0) X { X perror("/dev/tty"); X return 0; X } X X do X { X#if (defined i386 || defined sun) X extern char *(sys_errlist[]); X char *errmsg = sys_errlist[errno]; X#else X char errmsg[32]; X (void) sprintf(errmsg,"Error %d",errno); X#endif X if (errno) X mesg("'%s' on %s after %luk. Continue [cyn] ? ",errmsg,which,done>>10); X else X mesg("EOF on %s after %luk. Continue [cyn] ? ",which,done>>10); X X read(tty,reply,sizeof reply); X } X while (strchr("cCyYnN",reply[0]) == 0); X X call close(tty); X X if (strchr("nN",reply[0]) != 0) X return 0L; X X errno = 0; X X if (strchr("cC",reply[0]) != 0) X { X call lseek(fd->fd,0L,0); X return fd->size; X } X X return space; X} X Xlocal unsigned FdCanDo on((remaining,available)) is X( X fast address remaining X_ fast long unsigned available X) X{ X return (remaining < available) X ? (unsigned) remaining : (unsigned) available; X} X Xlocal address FdRead on((fd,buffer,todo,done)) is X( X fast Fd *fd X_ pointer buffer X_ fast address todo X_ long unsigned done X) X{ X fast long unsigned space; X fast int bytesRead; X fast address justDone; X X switch (fd->status) X { X when FdEOF: return 0; X when FdERROR: return -1; X when FdCLOSED: return -1; X X when FdOPEN: X X space = fd->size - done%fd->size; X X for (justDone = 0; space != 0L && justDone < todo;) X { X bytesRead = read(fd->fd,buffer+justDone, X FdCanDo(todo-justDone,space-justDone)); X X if (bytesRead <= 0 || (justDone += bytesRead) == space) X space = FdRetry(fd,"input",done+justDone,space-justDone); X } X X if (bytesRead == 0) fd->status = FdEOF; X if (bytesRead < 0) fd->status = FdERROR; X X Mesg(("FdRead %d reads %d last %d\n",fd->fd,justDone,bytesRead)); X X return (justDone == 0) ? bytesRead : justDone; X } X /*NOTREACHED*/ X} X Xlocal address FdWrite on((fd,buffer,todo,done)) is X( X fast Fd *fd X_ pointer buffer X_ fast address todo X_ long unsigned done X) X{ X fast long unsigned space; X fast int bytesWritten; X fast address justDone; X X switch (fd->status) X { X when FdEOF: return 0; X when FdERROR: return -1; X when FdCLOSED: return -1; X X when FdOPEN: X X space = fd->size - done%fd->size; X X for (justDone = 0; space != 0L && justDone < todo;) X { X bytesWritten = write(fd->fd,buffer+justDone, X FdCanDo(todo-justDone,space-justDone)); X X if (bytesWritten <= 0 || (justDone += bytesWritten) == space) X space = FdRetry(fd,"output",done+justDone,space-justDone); X } X X Mesg(("FdWrite %d writes %d last %d\n",fd->fd,justDone,bytesWritten)); X X if (bytesWritten == 0) fd->status = FdEOF; X if (bytesWritten < 0) fd->status = FdERROR; X X return (justDone == 0) ? bytesWritten : justDone; X } X /*NOTREACHED*/ X} X X/* X A Token is scalar value representing a command. X*/ X Xtypedef short scalar Token; X X#define TokenREAD 0 X#define TokenWRITE 1 X#define TokenSTOP 2 X#define TokenABORT -1 X X/* X Here we represent Streams as Fds; this is is not entirely X appropriate, as Fds have also a volume size, and relatively X high overhead write and read functions. Well, we just take X some liberties with abstraction levels here. Actually we X should have an Fd abstraction for stream pipes and a Vol X abstraction for input and output... X*/ X Xlocal bool StreamPipe on((downstream,upstream)) is X( X fast Fd *downstream X_ fast Fd *upstream X) X{ X int links[2]; X X if (pipe(links) < 0) X { X perror("team: opening links"); X return false; X } X X Mesg(("StreamPipe fd downstream %d upstream %d\n",links[1],links[0])); X X return FdOpen(downstream,links[1],TeamHVOLSZ) X && FdOpen(upstream,links[0],TeamHVOLSZ); X} X Xmode(struct,StreamMsg) X{ X Token token; X short status; X long unsigned done; X}; X Xlocal bool StreamSend on((fd,token,status,done)) is X( X fast Fd *fd X_ Token token X_ short status X_ long unsigned done X) X{ X fast int n; X StreamMsg message; X X message.token = token; X message.status = status; X message.done = done; X X n = write(fd->fd,(pointer) &message,(unsigned) sizeof message); X X Mesg(("StreamSend fd %u n %d token %d\n",fd->fd,n,token)); X X return n == sizeof message; X} X Xlocal bool StreamReceive on((fd,tokenp,statusp,donep)) is X( X fast Fd *fd X_ Token *tokenp X_ short *statusp X_ long unsigned *donep X) X{ X fast int n; X StreamMsg message; X X n = read(fd->fd,(pointer) &message,(unsigned) sizeof message); X *tokenp = message.token; X *statusp = message.status; X *donep = message.done; X X Mesg(("StreamReceive fd %u n %d token %d\n",fd->fd,n,*tokenp)); X X return n == sizeof message; X} X/* X A guy is an instance of the input to output copier. It is attached X to a relay station, with an upstream link, from which commands X arrive, and a downward link, to which they are relayed once they are X executed. X*/ X Xmode(struct,Guy) X{ X int pid; X Fd upStream; X Fd downStream; X}; X Xlocal bool GuyOpen on((guy,pid,upstream,downstream)) is X( X fast Guy *guy X_ int pid X_ Fd *upstream X_ Fd *downstream X) X{ X Mesg(("GuyOpen pid %u upstream %u downstream %u\n", X pid,upstream->fd,downstream->fd)); X X guy->pid = pid; X FdSet(&guy->upStream,upstream); X FdSet(&guy->downStream,downstream); X X return true; X} X X#define GuySEND(guy,token,status,done) \ X StreamSend(&guy->downStream,token,status,done) X X#define GuyRECEIVE(guy,tokenp,statusp,donep) \ X StreamReceive(&guy->upStream,tokenp,statusp,donep) X Xlocal bool GuyStop of((Guy *,char *,long unsigned)); X Xlocal bool GuyStart on((guy,bufsize)) is X( X fast Guy *guy X_ address bufsize X) X{ X fast char *buffer; X Token token; X short status; X long unsigned done; X bool received; X static int bytesRead,bytesWritten; X X Mesg(("GuyStart guy %#o bufsize %u\n",guy,bufsize)); X X buffer = (pointer) malloc((unsigned) bufsize); X if (buffer == nil(pointer)) X { X mesg("team: guy %d cannot allocate %u bytes\n", X guy->pid,bufsize); X return false; X } X X while ((received = GuyRECEIVE(guy,&token,&status,&done)) && token != TokenSTOP) X switch (token) X { X when TokenREAD: X FdIn.status = status; X X Mesg(("GuyStart reading %d chars\n",bufsize)); X bytesRead = FdRead(&FdIn,(pointer) buffer,bufsize,done); X Mesg(("GuyStart reads %d chars\n",bytesRead)); X X if (bytesRead == 0) GuyStop(guy,nil(char *),done); X if (bytesRead < 0) GuyStop(guy,"error on guy read",done); X X done += bytesRead; X X if (verbose) X mesg("%luk read \r",done>>10); X X if (!GuySEND(guy,TokenREAD,FdIn.status,done)) X GuyStop(guy,"guy cannot send READ",done); X X when TokenWRITE: X FdOut.status = status; X X Mesg(("GuyStart writing %d chars\n",bytesRead)); X bytesWritten = FdWrite(&FdOut,(pointer) buffer,(address) bytesRead,done); X Mesg(("GuyStart writes %d chars\n",bytesWritten)); X X if (bytesWritten == 0) GuyStop(guy,"eof on guy write",done); X if (bytesWritten < 0) GuyStop(guy,"error on guy write",done); X X done += bytesWritten; X X if (verbose) X mesg("%luk written\r",done>>10); X X if (!GuySEND(guy,TokenWRITE,FdOut.status,done)) X GuyStop(guy,"guy cannot send WRITE",done); X X when TokenABORT: X GuyStop(guy,"guy was aborted",0L); X X otherwise: X GuyStop(guy,"impossible token on ring",done); X } X X /* free((char *) buffer); */ X X GuyStop(guy,(received) ? nil(char *) : "error on upstream receive",0L); X /*NOTREACHED*/ X X /*return true;*/ X} X Xlocal bool GuyStop on((guy,errormsg,done)) is X( X fast Guy *guy X_ char *errormsg X_ long unsigned done X) X{ X Mesg(("GuyStop guy %#o\n",guy)); X X if (done) X { X if (report) X mesg("%lu kilobytes, %lu seconds\r\n", X done>>10,(long unsigned) (time((time_t *) 0)-origin)); X else if (verbose) X mesg("\n"); X } X X if (errormsg != nil(char *)) X { X mesg("team: guy pid %u: %s\n",guy->pid,errormsg); X call GuySEND(guy,TokenABORT,FdERROR,0L); X exit(1); X /*NOTREACHED*/ X } X X if (!GuySEND(guy,TokenSTOP,FdEOF,0L)) X { X exit(1); X /*NOTREACHED*/ X } X X exit(0); X /*NOTREACHED*/ X} X Xlocal bool GuyClose on((guy)) is X( X fast Guy *guy X) X{ X return FdClose(&guy->upStream) && FdClose(&guy->downStream); X} X X/* X A team is made up of a ring of guys; each guy copies a blockfrom its X input to its ouput, and is driven by tokens sent to it by the X previous guy on a pipe. X*/ X Xmode(struct,Team) X{ X Guy *guys; X short unsigned size; X short unsigned active; X}; X Xlocal bool TeamOpen on((team,nominalsize)) is X( X Team *team X_ short unsigned nominalsize X) X{ X Mesg(("TeamOpen nominalsize %u\n",nominalsize)); X X team->size = 0; X team->active = 0; X X team->guys = (Guy *) calloc(sizeof (Guy),nominalsize); X X for (team->size = 0; team->size < nominalsize; team->size++); X X were (team->guys == nil(Guy *)) X return false; X X return true; X} X Xlocal bool TeamStart on((team,bufsize,isize,osize)) is X( X fast Team *team X_ address bufsize X_ long unsigned isize X_ long unsigned osize X) X{ X /* X When generating each guy, we pass it an upstream link that X is the downstream of the previous guy, and create a new X downstream link that will be the next upstream. X X At each turn we obviously close the old downstream once it X has been passed to the forked guy. X X A special case are the first and last guys; the upstreamof X the first guy shall be the downstream of the last. This X goes against the grain of our main logic, where the X upstream is expected to already exist and the downstream X must be created. X X This means that the last and first guys are created in a X special way. When creating the first guy we shall create X its upstreamlink as well as its downstream, and we shall X save that in a special variable, last_downstream. This we X shall use as the downstreamof the last guy. X X We shall also keep it open in the team manager (parent X process) because we shall use it to do the initial send of X the read and write tokens that will circulate in the relay X ring, activating the guys. X X Of course because of this each guy will inherit this link X as wellas its upstream and downstream, but shall graciously X close it. X */ X X Fd last_downstream; X Fd this_upstream; X Fd this_downstream; X Fd next_upstream; X X Mesg(("TeamStart team %#o size %u bufsize %u\n", X team,team->size,bufsize)); X X call FdOpen(&FdIn,0,isize); call FdOpen(&FdOut,1,osize); X X for (team->active = 0; team->active < team->size; team->active++) X { X fast Guy *guy; X fast int pid; X X guy = team->guys+team->active; X X if (team->active == 0) X { X if (!StreamPipe(&last_downstream,&this_upstream)) X { X perror("cannot open first link"); X return false; X } X X if (!StreamPipe(&this_downstream,&next_upstream)) X { X perror("cannot open link"); X return false; X } X } X else if (team->active < (team->size-1)) X { X if (!StreamPipe(&this_downstream,&next_upstream)) X { X perror("cannot open link"); X return false; X } X } X else /*if (team->active == team->size-1)*/ X { X FdSet(&this_downstream,&last_downstream); X if (!FdCopy(&last_downstream,&this_downstream)) X perror("team: cannot copy last downstream"); X } X X Mesg(("TeamStart going to fork for guy %#o\n",guy)); X X pid = fork(); X X if (pid > 0) X { X Mesg(("TeamStart forked guy %#o as pid %u\n",guy,pid)); X guy->pid = pid; X X if (!FdClose(&this_upstream)) X perror("cannot close this upstream link"); X if (!FdClose(&this_downstream)) X perror("cannot close this downstream link"); X X FdSet(&this_upstream,&next_upstream); X } X else if (pid == 0) X { X pid = getpid(); X X if (!FdClose(&last_downstream)) X perror("cannot close inherited first link"); X X if (!GuyOpen(guy,pid,&this_upstream,&this_downstream)) X GuyStop(guy,"cannot open guy",0L); X if (!GuyStart(guy,bufsize)) X GuyStop(guy,"cannot start guy",0L); X if (!GuyClose(guy)) X perror("cannot close guy"); X X /*NOTREACHED*/ X } X else if (pid < 0) X { X perror("team: forking a guy"); X return false; X } X } X X if (!StreamSend(&last_downstream,TokenREAD,FdOPEN,0L)) X { X perror("cannot send first READ token"); X return false; X } X X if (!StreamSend(&last_downstream,TokenWRITE,FdOPEN,0L)) X { X perror("cannot send first WRITE token"); X return false; X } X X if (!FdClose(&last_downstream)) X perror("cannot close first link"); X X return true; X} X Xlocal bool TeamWait on((team)) is X( X fast Team *team X) X{ X while (team->active != 0) X { X int guypid; X int status; X X guypid = wait(&status); X if (guypid >= 0) X { X fast short unsigned guyno; X X for (guyno = 0; guyno < team->size; guyno++) X if (guypid == team->guys[guyno].pid) X { X team->guys[guyno].pid = -1; X break; X } X } X else X { X mesg("team: no guys, believed %u left\n",team->active); X return true; X } X X --team->active; X X if (status != 0 && team->active != 0) X return false; X } X X return true; X} X Xlocal bool TeamStop on((team)) is X( X fast Team *team X) X{ X fast short unsigned guyno; X X Mesg(("TeamStop team %#o\n",team)); X X for (guyno = 0; guyno < team->size; guyno++) X { X fast Guy *guy; X X guy = team->guys+guyno; X if (guy->pid >= 0) X { X /*kill(guy->pid,SIGKILL);*/ X --team->active; X } X } X X return team->active == 0; X} X Xlocal bool TeamClose on((team)) is X( X fast Team *team X) X{ X for (team->size; team->size != 0; --team->size) X continue; X X free(team->guys); X X return true; X} X Xlocal void usage of((noparms)) X{ X fprintf(stderr,"\ Xsyntax: team [-[vr]] [-iI[bkm] [-oO[bkm] [N[bkm] [P]]\n\ X copies standard input to output\n\ X -v gives ongoing report, -r final report\n\ X I is input volume size (default %lum)\n\ X O is output volume size (default %lum)\n\ X N is buffer size (default %luk)\n\ X P is number of processes (default %u)\n\ X (postfix b means *512, k means *1KB, m means *1MB)\n\ X", X TeamHVOLSZ>>20,TeamHVOLSZ>>20, X TeamDBUFSZ>>10,TeamDTEAMSZ); X X exit(1); X /*NOTREACHED*/ X} X Xlocal long unsigned atos on((s)) is X( X fast char *s X) X{ X fast unsigned long l; X X for ( X s, l = 0L; X *s >= '0' && *s <= '9'; X s++ X ) X l = l*10L + (long unsigned) (*s-'0'); X X if (*s == 'b') l *= (1L<<9); X if (*s == 'k') l *= (1L<<10); X if (*s == 'm') l *= (1L<<20); X X return l; X} X Xglobal int main on((argc,argv)) is X( X int argc X_ char *(argv[]) X) X{ X Team team; X short unsigned teamsize; X X address bufsize; X long unsigned isize; X long unsigned osize; X int opt; X X teamsize = TeamDTEAMSZ; X bufsize = TeamDBUFSZ; X isize = TeamHVOLSZ; X osize = TeamHVOLSZ; X optind = 1; X X while ((opt = getopt(argc,argv,"vri:o:")) != -1) X switch (opt) X { X when 'i': X isize = atos(optarg); X if (isize < TeamLVOLSZ || isize > TeamHVOLSZ) X { X fprintf(stderr,"team: invalid input volume size %lu\n",isize); X usage(); X } X X when 'o': X osize = atos(optarg); X if (osize < TeamLVOLSZ || osize > TeamHVOLSZ) X { X fprintf(stderr,"team: invalid output volume size %lu\n",osize); X usage(); X } X X when 'v': X verbose ^= 1; X X when 'r': X report ^= 1; X X otherwise: X usage(); X } X X argc -= optind, argv += optind; X X if (argc != 0) X { X bufsize = (address) atos(argv[0]); X if (bufsize < TeamLBUFSZ || bufsize > TeamHBUFSZ) X { X fprintf(stderr,"team: invalid block size %u\n", X bufsize); X usage(); X } X --argc, argv++; X } X X if (argc != 0) X { X teamsize = atoi(argv[0]); X if (teamsize < 2 || teamsize > TeamHTEAMSZ) X { X fprintf(stderr,"team: invalid # of processes %d\n",teamsize); X usage(); X } X --argc, argv++; X } X X if (argc != 0) usage(); X X if (!TeamOpen(&team,teamsize)) X { X mesg("team: cannot setup the team with %u guys\n",teamsize); X return 1; X } X X origin = time((time_t *) 0); X X if (!TeamStart(&team,bufsize,isize,osize)) X { X mesg("team: cannot start the team\n"); X return 1; X } X X if (!TeamWait(&team)) X { X mesg("team: stop remaining %u guys\n",team.active); X X if (!TeamStop(&team)) X { X mesg("team: cannot stop the team\n"); X return 1; X } X } X X if (!TeamClose(&team)) X { X mesg("team: cannot close the team\n"); X return 1; X } X X return 0; X} SHAR_EOF if test 23313 -ne "`wc -c < 'team.c.dist'`" then echo shar: error transmitting "'team.c.dist'" '(should have been 23313 characters)' fi fi # end of overwriting check # End of shell archive exit 0