home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Club Amiga de Montreal - CAM
/
CAM_CD_1.iso
/
files
/
215.lha
/
Pipe_v2.02
/
pipe.c
< prev
next >
Wrap
C/C++ Source or Header
|
1996-02-15
|
23KB
|
843 lines
/*
* PIPE.C V2.03
*
* YOU MUST COMPILE +BCDL, Aztec (32 bit ints).
*
* This represents a complete turnaround in functionality from V1. I've
* re-written it completely from scratch. The PIPE: device now supports
* bi-directional connections through a single file-handle, dynamic buffer
* sizing, signal capability, and an almost-terminal-like capability.
*
* NOTE NOTE NOTE: In most cases you simply use the same format as Version
* 1.. that is, redirect <pipe:name to get the data redirected >pipe:name,
* don't get turned off by the technical discussion below.
*
*/
/*
* Some #include's may be missing since I simply give an Aztec compiler
* option to load the entire symbol table and it doesn't actually go
* through these includes.
*/
#include <exec/types.h>
#include <exec/nodes.h>
#include <exec/lists.h>
#include <exec/ports.h>
#include <exec/libraries.h>
#include <exec/devices.h>
#include <exec/io.h>
#include <exec/memory.h>
#include <devices/console.h>
#include <libraries/dos.h>
#include <libraries/dosextens.h>
#include <libraries/filehandler.h>
#include <local/xmisc.h>
#define MPC (MEMF_PUBLIC|MEMF_CLEAR) /* options to AllocMem() */
#define ACTION_READWRITE 1004
#define ACTION_FIND_INPUT 1005 /* various ACTION's supported */
#define ACTION_FIND_OUTPUT 1006
#define ACTION_END 1007
#define ACTION_EXAMINE 23
#define ACTION_EXAMINENEXT 24
#define ACTION_LOCATE 8
#define ACTION_FREELOCK 15
#define ACTION_COPYDIR 19
#undef BADDR
#define BADDR(x) ((APTR)((long)x << 2)) /* convert BCPL->APTR */
#define DOS_FALSE 0
#define DOS_TRUE -1
#define ST_REMOTE 0x01 /* this is the remote side */
#define ST_WASOPEN 0x02 /* this side openned at least once */
#define ST_REMEOF 0x04 /* other side closed */
#define ST_TERM 0x08 /* terminal pipe:name[/flags] */
#define ST_COOKED 0x10 /* cooked mode. */
#define ST_STAR 0x20 /* CLI strangeness '*' open */
#define ST_WATER 0x40 /* waiting to go below low-water mark */
typedef struct FileLock LOCK;
typedef struct FileInfoBlock FIB;
typedef struct DosPacket DOSPACKET;
typedef struct Process PROC;
typedef struct DeviceNode DEVNODE;
typedef struct FileHandle FH;
typedef struct CommandLineInterface CLI;
typedef unsigned char u_char;
typedef struct Message MSG;
extern XLIST *llink(), *lunlink();
/*
* a PBUF is part of a linked list of memory buffers waiting to be read
* on a pipe.
*/
typedef struct _PBUF {
struct _PBUF *next;
long bytes;
long index;
char buf[4];
} PBUF;
/*
* The PIPE structure itself. Actually consists of a linked pair of
* PIPE structures. The other is accessed through pipe->remote and
* vise-versa. One of the structures is the MASTER, the other is the
* SLAVE. The SLAVE has the ST_REMOTE flag set. The XLIST and NAME
* entries are used only by the MASTER end for bookkeeping.
*/
typedef struct _PIPE {
XLIST list; /* linked list of all pipes */
MSG *rpend; /* whos waiting to read from me */
MSG *wpend; /* whos waiting to write to me */
char *name; /* name of this pipe */
struct _PIPE *remote; /* linked pair... other PIPE */
short refs; /* # Open references */
short stat; /* Status.. ST_XXXX flags */
PBUF *pbase, *plast; /* buffers waiting to be read? */
long total; /* total # bytes in buffers */
long lowwater; /* below which writes are re-enabled */
long maxbytes; /* Maximum total # bytes allowed */
PROC *procsignal; /* process to signal on data present */
short signum; /* signal number */
} PIPE;
/*
* NOTE: Globals in the Bss space are not automatically initialized to
* 0 when you don't use Aztec's startup _main, which we can't since
* this is a device.
*/
extern DOSPACKET *taskwait(); /* wait for a message */
extern void *AllocMem();
extern void *AllocDS();
extern void *strcpy();
extern void attemptread(); /* the MEAT */
extern PROC *FindTask();
long SysBase; /* required to make Exec calls */
long DOSBase, Fh; /* Debugging.. required for DOS */
char Buf[256]; /* Scratch buffer */
_whocareswhatthisisnamed()
{
PROC *myproc;
DEVNODE *mynode;
PIPE *Pipe = NULL;
u_char notdone;
SysBase = *(long *)4;
Fh = NULL;
DOSBase = NULL;
myproc = (PROC *)FindTask(0L);
/*
* INITIAL STARTUP MESSAGE
*/
{
register DOSPACKET *mypkt;
mypkt = taskwait(myproc);
mynode = (DEVNODE *)BADDR(mypkt->dp_Arg3);
mynode->dn_Task = &myproc->pr_MsgPort;
returnpkt(mypkt, myproc, DOS_TRUE, 0);
}
top:
#ifdef XDEBUG
DOSBase = OpenLibrary("dos.library", 0);
Fh = Open("con:0/0/400/100/pipedevice", 1006);
#endif
/*
* MAIN LOOP
*/
notdone = 1;
while(notdone) {
register DOSPACKET *mypkt; /* dos packet received */
register PIPE *pipe; /* pointer to current PIPE */
long type; /* type of packet */
mypkt = taskwait(myproc); /* wait/get next packet */
#ifdef XDEBUG
if (Fh)
fhprintf(Fh, "OldRes1 = %8lx\n", mypkt->dp_Res1);
#endif
mypkt->dp_Res1 = DOS_TRUE; /* default return value */
mypkt->dp_Res2 = 0; /* default no error */
type = mypkt->dp_Type; /* packet type */
/*
* Extract pipe pointer (only applies to read/write)
*/
pipe = (PIPE *)mypkt->dp_Arg1;
#ifdef XDEBUG
if (Fh) {
sprintf(Buf, "Packet: %4ld for %ld, pipe: %-8lx\n", mypkt->dp_Type, mypkt->dp_Arg3, pipe);
Write(Fh, Buf, strlen(Buf));
}
#endif
switch(type) {
case ACTION_FIND_INPUT:
case ACTION_FIND_OUTPUT:
case ACTION_READWRITE:
{
register PIPE *premote; /* linked pair: other PIPE */
register u_char *ptr, *s2; /* scratch vars */
register FH *fh; /* File handle from dp_Arg1 */
short remflags = 0; /* ST_XXX flags for premote */
short termflag = 0; /* standard flags */
long maxbytes = 4096; /* default max. buffer size */
PROC *procsignal = NULL; /* default process to sig. */
short signum = -1;
short queryflag = 0; /* query option? */
fh = (FH *)BADDR(mypkt->dp_Arg1); /* File handle */
ptr = (u_char *)BADDR(mypkt->dp_Arg3); /* PATHNAME */
BMov(ptr+1, Buf, *ptr); /* BCPL strangeness */
Buf[*ptr] = 0;
/*
* If '*' is the PATHNAME, then it is the CLI asking us
* to duplicate it's proc->pr_CIS entry.
*/
#ifdef XDEBUG
if (Fh) {
fhprintf(Fh, "fh->fh_Buf == %08lx\n", fh->fh_Buf);
fhprintf(Fh, "fh->fh_Pos == %08lx\n", fh->fh_Pos);
fhprintf(Fh, "fh->fh_End == %08lx\n", fh->fh_End);
fhprintf(Fh, "fh->fh_Arg1 == %08lx\n", fh->fh_Arg1);
fhprintf(Fh, "fh->fh_Arg2 == %08lx\n", fh->fh_Arg2);
}
#endif
if (strcmp(Buf, "*") == 0) {
register PROC *proc;
register FH *fh;
proc = (PROC *)mypkt->dp_Port->mp_SigTask;
#ifdef XDEBUG
if (Fh) {
fhprintf(Fh, "CLI -> STDIN %08lx ARG1 %08lx\n", ((CLI *)BADDR(proc->pr_CLI))->cli_StandardInput,
((FH *)BADDR(((CLI *)BADDR(proc->pr_CLI))->cli_StandardInput))->fh_Arg1);
fhprintf(Fh, "CLI -> STDOU %08lx ARG1 %08lx\n", ((CLI *)BADDR(proc->pr_CLI))->cli_StandardOutput,
((FH *)BADDR(((CLI *)BADDR(proc->pr_CLI))->cli_StandardOutput))->fh_Arg1);
fhprintf(Fh, "CLI -> CURIN %08lx ARG1 %08lx\n", ((CLI *)BADDR(proc->pr_CLI))->cli_CurrentInput,
((FH *)BADDR(((CLI *)BADDR(proc->pr_CLI))->cli_CurrentInput))->fh_Arg1);
fhprintf(Fh, "CLI -> CUROU %08lx ARG1 %08lx\n", ((CLI *)BADDR(proc->pr_CLI))->cli_CurrentOutput,
((FH *)BADDR(((CLI *)BADDR(proc->pr_CLI))->cli_CurrentOutput))->fh_Arg1);
fhprintf(Fh, "OPEN*: %08lx %08lx %08lx\n", mypkt->dp_Arg1, mypkt->dp_Arg2, mypkt->dp_Arg3);
fhprintf(Fh, " pr_CIS: %08lx // Arg1 %08lx\n", proc->pr_CIS, ((FH *)BADDR(proc->pr_CIS))->fh_Arg1);
fhprintf(Fh, " pr_COS: %08lx // Arg1 %08lx\n", proc->pr_COS, ((FH *)BADDR(proc->pr_COS))->fh_Arg1);
}
#endif
if (proc->pr_CIS) {
pipe = (PIPE *)((FH *)BADDR(proc->pr_CIS))->fh_Arg1;
pipe->stat |= ST_STAR; /* FLAG IT..is CLI */
type = ACTION_FIND_INPUT; /* SLAVE ONLY */
goto skip;
} else {
pipe = Pipe; /* most recent open */
pipe->stat |= ST_STAR;
type = ACTION_FIND_INPUT; /* SLAVE ONLY */
goto skip;
}
}
/*
* Process PIPE options. Note that I don't manually
* skip over ascii-numerics since that happens
* automatically in the inner for() loop.
*
* FLAGS ARE NOT PART OF THE PIPE NAME! pipe:a and
* pipe:a/n refer to the same PIPE.
*/
for (ptr = (u_char *)Buf; *ptr; ++ptr) {
if (*ptr == '/') {
*ptr = 0; /* not part of name */
for (++ptr; *ptr; ++ptr) {
if (*ptr == 'S') /* SLAVE */
type = ACTION_FIND_INPUT;
if (*ptr == 'M') /* MASTER */
type = ACTION_FIND_OUTPUT;
if (*ptr == 'c')
;
if (*ptr == 'q') /* QUERY */
queryflag = 1;
if (*ptr == 'n') /* INFINITE BUFFERING */
maxbytes = 0x1FFFFFFF;
if (*ptr == 't') /* TTY */
termflag |= ST_TERM;
if (*ptr == 'b') { /* BUFFER SIZE */
maxbytes = atoi(ptr+1);
if (maxbytes < 0)
maxbytes = 0;
if (maxbytes > 0x1FFFFFFF)
maxbytes = 0x1FFFFFFF;
}
if (*ptr == 's') { /* SIGNAL */
signum = atoi(ptr+1);
procsignal = (PROC *)mypkt->dp_Port->mp_SigTask;
}
}
break;
}
}
/*
* See if PIPE exists already.
*/
for (pipe = Pipe; pipe; pipe = (PIPE *)pipe->list.next) {
if (strcmp(pipe->name, Buf) == 0)
break;
}
if (pipe == NULL) {
if (queryflag) /* Query Failed */
goto fail;
pipe = (PIPE *)AllocMem(sizeof(PIPE), MPC);
premote = (PIPE *)AllocMem(sizeof(PIPE), MPC);
pipe->name = (char *)strcpy(AllocMem(strlen(Buf)+1, 0), Buf);
pipe->remote = premote;
pipe->stat |= termflag;
pipe->maxbytes = premote->maxbytes = 4096;
pipe->lowwater = premote->lowwater = 2048;
premote->remote = pipe;
premote->stat |= ST_REMOTE|termflag;
llink(&Pipe, pipe);
#ifdef XDEBUG
if (Fh) {
sprintf(Buf, "Pipe: %8lx, Pipe->next %8lx\n",
Pipe, Pipe->list.next);
Write(Fh, Buf, strlen(Buf));
}
#endif
}
/*
* If openning the SLAVE, we should point to the SLAVE
* part of the pipe structure.
*/
if (type == ACTION_FIND_INPUT)
pipe = pipe->remote;
if (queryflag) {
if (pipe->remote->refs == 0) {
fail: mypkt->dp_Res1 = DOS_FALSE;
mypkt->dp_Res2 = ERROR_OBJECT_NOT_FOUND;
goto done;
}
}
skip:
++pipe->refs;
pipe->procsignal = procsignal;
pipe->signum = signum;
pipe->remote->maxbytes = maxbytes;
pipe->remote->lowwater = maxbytes >> 1;
pipe->remote->stat |= remflags;
if (pipe->remote->lowwater < 2048)
pipe->remote->lowwater = maxbytes;
/*
* the ST_WASOPEN flag is not set on open for 'reads'.
* Rather, it is set when the read operation is actually
* done. This is so programs like C:COPY can do an
* Open(name,1005)/Close() to check for existance before
* actually openning the destination 1006 without
* screwing up the PIPE.
*/
if (type == ACTION_FIND_OUTPUT)
pipe->stat |= ST_WASOPEN;
pipe->remote->stat &= ~ST_REMEOF;
fh->fh_Arg1 = (long)pipe;
fh->fh_Port = (struct MsgPort *)DOS_TRUE;
if ((pipe->wpend || pipe->pbase) && pipe->procsignal)
Signal(pipe->procsignal, 1 << pipe->signum);
}
done:
returnpktplain(mypkt, myproc);
break;
case ACTION_END: /* CLOSE */
--pipe->refs;
/*
* If closing process had a signal, we had better not
* signal that process anymore.
*/
if (pipe->procsignal == (PROC *)mypkt->dp_Port->mp_SigTask)
pipe->procsignal = NULL;
/*
* Check to see if we can deallocate the PIPE. The ST_REMEOF
* flag means that one end has closed communications and
* exited while the other end was still active. This causes
* an EOF on Read() and an error on Write().
*/
if (pipe->refs == 0) {
if (pipe->stat & ST_WASOPEN)
pipe->remote->stat |= ST_REMEOF;
attemptread(pipe->remote, myproc);
if (pipe->remote->stat & ST_WASOPEN) {
if (pipe->remote->refs == 0) {
register PIPE *master = pipe;
if (pipe->stat & ST_REMOTE)
master = pipe->remote;
lunlink(master);
#ifdef XDEBUG
if (Fh) {
sprintf(Buf, "Unlink %8lx\n", master);
Write(Fh, Buf, strlen(Buf));
}
#endif
FreeMem(master->name, strlen(master->name)+1);
freepbufs(master);
freepbufs(master->remote);
FreeMem(master->remote, sizeof(PIPE));
FreeMem(master, sizeof(PIPE));
if (Pipe == NULL) /* Can we exit the process? */
notdone = 0;
#ifdef XDEBUG
if (Fh) {
sprintf(Buf, "Pipe = %8lx\n", Pipe);
Write(Fh,Buf,strlen(Buf));
}
#endif
}
}
}
returnpktplain(mypkt, myproc);
break;
case ACTION_READ:
pipe->stat |= ST_WASOPEN;
mypkt->dp_Res1 = 0; /* default return value is 0 */
llinkend(&pipe->rpend, mypkt->dp_Link);
attemptread(pipe, myproc);
break;
case ACTION_WRITE:
mypkt->dp_Res1 = 0; /* default return value is 0 */
if (pipe->stat & ST_REMEOF) {
mypkt->dp_Res1 = -1;
mypkt->dp_Res2 = ERROR_SEEK_ERROR;
returnpktplain(mypkt, myproc);
break;
}
llinkend(&pipe->remote->wpend, mypkt->dp_Link);
attemptread(pipe->remote, myproc);
break;
case ACTION_COPYDIR: /* 1:lock res1:newlock */
{
register LOCK *lock = AllocDS(sizeof(LOCK));
BMov(mypkt->dp_Arg1 << 2, lock, sizeof(LOCK));
returnpkt(mypkt, myproc, (long)lock << 2, 0);
}
break;
case ACTION_LOCATE: /* 1:lock 2:name 3:mode res1:lock/NULL */
{
register LOCK *lock;
register char *str = (char *)(mypkt->dp_Arg2 << 2);
pipe = (PIPE *)&Pipe;
if (str[str[0]] != ':') {
for (pipe = Pipe; pipe; pipe = (PIPE *)pipe->list.next) {
if (str[0] == strlen(pipe->name) && strncmp(str+1, pipe->name, str[0]) == 0)
break;
}
}
if (pipe) {
lock = AllocDS(sizeof(LOCK));
lock->fl_Key = (long)pipe;
lock->fl_Access = mypkt->dp_Arg3;
lock->fl_Task = &myproc->pr_MsgPort;
lock->fl_Volume = (BPTR)((long)mynode >> 2);
returnpkt(mypkt, myproc, (long)lock >> 2, 0);
} else {
returnpkt(mypkt, myproc, NULL, ERROR_OBJECT_NOT_FOUND);
}
}
break;
case ACTION_FREELOCK: /* 1:lock res1:bool */
{
if (mypkt->dp_Arg1)
FreeDS(mypkt->dp_Arg1 << 2);
returnpkt(mypkt, myproc, DOS_TRUE, 0);
}
break;
case ACTION_EXAMINE: /* 1:lock 2:fib res1:bool */
{
register LOCK *lock = (LOCK *)(mypkt->dp_Arg1 << 2);
register FIB *fib = (FIB *)(mypkt->dp_Arg2 << 2);
BZero(fib, sizeof(FIB));
pipe = (PIPE *)(fib->fib_DiskKey = lock->fl_Key);
if (pipe == (PIPE *)&Pipe) {
fib->fib_DirEntryType = 1;
strcpy(fib->fib_FileName+1, "PIPE:");
} else {
fib->fib_DirEntryType = -1;
strcpy(fib->fib_FileName+1, pipe->name);
fib->fib_Size = pipe->total + pipe->remote->total;
}
fib->fib_FileName[0] = strlen(fib->fib_FileName+1);
}
returnpkt(mypkt, myproc, DOS_TRUE, 0);
break;
case ACTION_EXAMINENEXT: /* 1:lock 2:fib res1:bool */
{
register FIB *fib = (FIB *)(mypkt->dp_Arg2 << 2);
register PIPE *pipe = (PIPE *)fib->fib_DiskKey;
if (pipe == (PIPE *)&Pipe)
pipe = Pipe;
else
pipe = (PIPE *)pipe->list.next;
if (pipe) {
fib->fib_DirEntryType = -1;
strcpy(fib->fib_FileName+1, pipe->name);
fib->fib_Protection = 0;
fib->fib_Size = pipe->total + pipe->remote->total;
fib->fib_NumBlocks = 0;
/*
sprintf(fib->fib_Comment+1, "this is a comment");
*/
fib->fib_Comment[1] = 0;
fib->fib_DiskKey = (long)pipe;
fib->fib_FileName[0] = strlen(fib->fib_FileName+1);
fib->fib_Comment [0] = strlen(fib->fib_Comment +1);
returnpkt(mypkt, myproc, DOS_TRUE, 0);
} else {
returnpkt(mypkt, myproc, DOS_FALSE, ERROR_NO_MORE_ENTRIES);
}
}
break;
default:
returnpkt(mypkt, myproc, DOS_FALSE, ERROR_ACTION_NOT_KNOWN);
break;
}
}
#ifdef XDEBUG
if (Fh)
Close(Fh);
#endif
if (DOSBase)
CloseLibrary(DOSBase);
/*
* Can only exit if no messages pending. There might be a window
* here, but there is nothing that can be done about it.
*/
Forbid();
if (taskpktrdy(myproc)) {
Permit();
goto top;
}
mynode->dn_Task = FALSE;
Permit();
/* we are a process "so we fall off the end of the world" */
/* MUST fall through */
}
void *
AllocDS(bytes)
{
register long *ptr;
bytes += 4;
ptr = AllocMem(bytes, MEMF_PUBLIC|MEMF_CLEAR);
*ptr = bytes;
return(ptr + 1);
}
FreeDS(ptr)
register long *ptr;
{
if (ptr)
FreeMem(ptr - 1, *(ptr-1));
}
/*
* All purpose routine which does everything. Basically:
*
* (A) Fill as many read requests as possible with data from the SBUFs
* (B) Fill as many read requests as possible with data from pending
* write requests.
* (C) Place as many write requests into SBUFs so we can return them.
*
* Also handles EOF and signalling conditions.
*/
void
attemptread(pipe, myproc)
register PIPE *pipe;
PROC *myproc;
{
MSG *mr, *mw;
register DOSPACKET *rp, *wp;
register PBUF *pb;
register long rleft, wleft, len;
#ifdef XDEBUG
if (Fh) {
if (pipe->stat & ST_REMOTE)
Write(Fh, "Reader: ", 8);
else
Write(Fh, "Writer: ", 8);
}
#endif
/*
* Fill as many read requests as possible from buffer list.
*/
while ((mr = pipe->rpend) && (pb = pipe->pbase)) {
rp = (DOSPACKET *)mr->mn_Node.ln_Name;
rleft = rp->dp_Arg3 - rp->dp_Res1;
wleft = pb->bytes - pb->index;
if (wleft > rleft)
wleft = rleft;
BMov(pb->buf + pb->index, rp->dp_Arg2 + rp->dp_Res1, wleft);
if ((rp->dp_Res1 += wleft) == rp->dp_Arg3) {
lunlink(mr);
returnpktplain(rp, myproc);
}
if ((pb->index += wleft) == pb->bytes) {
pipe->pbase = pb->next;
if (pipe->pbase == NULL)
pipe->plast = NULL;
pipe->total -= pb->bytes;
#ifdef XDEBUG
if (Fh) {
sprintf(Buf, "FreeMem: %8lx %ld\n", pb, sizeof(PBUF)- sizeof(pb->buf) + pb->bytes);
Write(Fh, Buf, strlen(Buf));
}
#endif
FreeMem(pb, sizeof(PBUF) - sizeof(pb->buf) + pb->bytes);
}
}
if (pipe->total < pipe->lowwater)
pipe->stat &= ~ST_WATER;
/*
* Fill as many read requests as possible from pending write requests
*/
while ((mr = pipe->rpend) && (mw = pipe->wpend)) {
rp = (DOSPACKET *)mr->mn_Node.ln_Name;
wp = (DOSPACKET *)mw->mn_Node.ln_Name;
/* wp->dp_Arg2 is buf, wp->dp_Arg3 is #, wp->dp_Res1 is actual */
/* rp->dp_Arg2 is buf, rp->dp_Arg3 is #, rp->dp_Res1 is actual */
rleft = rp->dp_Arg3 - rp->dp_Res1; /* # left to read */
wleft = wp->dp_Arg3 - wp->dp_Res1; /* # left to write */
if (wleft > rleft) /* wleft is amount to actually transfer */
wleft = rleft;
BMov(wp->dp_Arg2 + wp->dp_Res1, rp->dp_Arg2 + rp->dp_Res1, wleft);
if ((wp->dp_Res1 += wleft) == wp->dp_Arg3) {
#ifdef XDEBUG
if (Fh)
Write(Fh, "wexhaust ", 9);
#endif
lunlink(mw);
returnpktplain(wp, myproc);
}
if ((rp->dp_Res1 += wleft) == rp->dp_Arg3) {
#ifdef XDEBUG
if (Fh)
Write(Fh, "rexhaust ", 9);
#endif
lunlink(mr);
returnpktplain(rp, myproc);
}
}
/*
* If REMOTE EOF, return any remaining read requests. However, if
* was from a NEWCLI, then do big hack and don't return the requests
* since it will crash the system. That is, the requests stay pending.
* If we were openned with a signal option, return remaining read
* requests with an error (-1).
*
* Note: partially filled read requests are returned normally.
*/
if ((pipe->stat & (ST_REMEOF|ST_STAR)) == ST_REMEOF) {
while (mr = pipe->rpend) {
lunlink(mr);
rp = (DOSPACKET *)mr->mn_Node.ln_Name;
if (pipe->procsignal && rp->dp_Res1 == 0) {
rp->dp_Res1 = -1;
rp->dp_Res2 = ERROR_NO_MORE_ENTRIES; /* nothing else comes close. */
}
returnpktplain(rp, myproc);
}
return;
}
/*
* If a pipe has a signal attached to it:
* (A) signal the process if any writes are pending
* (B) return ALL read requests always immediately whether they are
* fullfilled or not.
*/
if (pipe->procsignal) {
if (pipe->wpend)
Signal(pipe->procsignal, 1 << pipe->signum);
while (mr = pipe->rpend) {
lunlink(mr);
rp = (DOSPACKET *)mr->mn_Node.ln_Name;
returnpktplain(rp, myproc);
}
}
/*
* Put any remaining write requests into buffers, if possible.
* Do not copy new write requests if we haven't gotten back to the
* low water mark yet
*/
while ((mw = pipe->wpend) && !(pipe->stat & ST_WATER)) {
wp = (DOSPACKET *)mw->mn_Node.ln_Name;
wleft = wp->dp_Arg3 - wp->dp_Res1;
if (pipe->total + wleft > pipe->maxbytes) {
pipe->stat |= ST_WATER;
break;
}
pb = (PBUF *)AllocMem(sizeof(PBUF) + wleft - sizeof(pb->buf), 0);
#ifdef XDEBUG
if (Fh) {
sprintf(Buf, "Allocate: %8lx %8ld %ld\n", pb, wleft, sizeof(PBUF) + wleft - sizeof(pb->buf));
Write(Fh, Buf, strlen(Buf));
}
#endif
if (pb == NULL) /* Sorry, no go. */
break;
BMov(wp->dp_Arg2 + wp->dp_Res1, pb->buf, wleft);
pb->next = NULL;
pb->bytes = wleft;
pb->index = 0;
if (pipe->plast) {
pipe->plast->next = pb;
pipe->plast = pb;
} else {
pipe->pbase = pipe->plast = pb;
}
wp->dp_Res1 += wleft;
lunlink(mw);
returnpktplain(wp, myproc);
pipe->total += wleft;
}
/*
* If in Terminal mode, a read request need only a single byte
* to return.
*/
if ((pipe->stat & ST_TERM) && (mr = pipe->rpend)) {
rp = (DOSPACKET *)mr->mn_Node.ln_Name;
if (rp->dp_Res1) {
lunlink(mr);
returnpktplain(rp, myproc);
}
}
#ifdef XDEBUG
if (Fh)
Write(Fh, "\n", 1);
#endif
}
/*
* Link a request onto the end of a doubly linked list. Requests must
* be processed in order.
*/
llinkend(px, link)
register XLIST **px, *link;
{
while (*px)
px = &(*px)->next;
*px = link;
link->prev = px;
link->next = NULL;
}
freepbufs(pipe)
register PIPE *pipe;
{
register PBUF *pb, *npb;
for (pb = pipe->pbase; pb; pb = npb) {
npb = pb->next;
FreeMem(pb, sizeof(PBUF) - sizeof(pb->buf) + pb->bytes);
}
pipe->pbase = pipe->plast = NULL;
}
XLIST *
llink(list, en)
register XLIST *en, **list;
{
en->next = *list;
en->prev = list;
*list = en;
if (en->next)
en->next->prev = &en->next;
return(en);
}
XLIST *
lunlink(en)
register XLIST *en;
{
if (en) {
if (en->next)
en->next->prev = en->prev;
*en->prev = en->next;
en->next = NULL;
en->prev = NULL;
}
return(en);
}