Next | Prev | Up | Top | Contents | Index

Interprocess Communication

The program in Example A-6 illustrates the use of some of the interprocess communication (IPC) features of IRIX, in particular:

The program models a real-time data-collection program. The main process establishes an arena. Within the arena it creates a data structure that defines and manages a ring buffer. Then the main process uses sproc() to create three processes:

After starting the three processes, the main process waits for one to terminate. When there are no errors, inputProcess() is the first and only process to terminate--the two outputProcess() instances end up blocked on a semaphore, waiting for more data.

The main process kills the remaining processes; then displays the metering information from the lock and semaphores, and terminates the program.

The three simulated real-time processes communicate through two semaphores and a lock.

The displayed metering data at the end of the program shows whether the output processes could keep up with the input process. It is necessary to run the program with a nondegrading real-time priority to get consistent results. The output in Example A-4 shows a case in which output did not keep up.

Example A-4 : Producer/Consumer Program Test 1

# npri -h 39 ./ringBuffer -t 20000
Lock lockRBupdate acquired 4004 times, 4004 without waiting (100%)
Metering info on sema semRBdata
  P: 2004, 2000 with no wait (99%)
  V: 2002, 2 with P waiting (0%)
Metering info on sema semRBspace
  P: 2002, 1423 with no wait (71%)
  V: 2002, 579 with P waiting (28%)
In Example A-4, look first at the P operations for semRBspace. 71% of the time, when inputProcess() applies uspsema() to this semaphore to acquire a slot in the ring buffer, it does not wait. However, 29% of the time it did wait, meaning that the ring buffer was full and no free slots were available until an outputProcess() released one. Clearly, the output processes were not keeping up with the input data rate.

Example A-5 : Producer/Consumer Program Test 2

# npri -h 39 ./ringBuffer -t 5000
Lock lockRBupdate acquired 4004 times, 4004 without waiting (100%)
Metering info on sema semRBdata
  P: 2004, 1565 with no wait (78%)
  V: 2002, 437 with P waiting (21%)
Metering info on sema semRBspace
  P: 2002, 2002 with no wait (100%)
  V: 2002, 0 with P waiting (0%)
Example A-5 shows a test run in which the output processes did keep up with the input rate. In every case, inputProcess() was able to acquire a slot from semRBspace without waiting. 22% of the time, when an outputProcess() tried to acquire a data item from semRBdata, it had to wait, meaning the ring buffer was empty. (This percentage would be higher if inputProcess() did not frequently dump blocks of 2-16 items into the buffer.)

Example A-6 : Producer/Consumer Program Demonstrating IPC Functions

#include <stdlib.h> /* for getopt() */
#include <signal.h>
#include <sys/time.h>
#include <ulocks.h>
#include <math.h> /* for random() and srandom() */
#include <sys/types.h> /* for pid_t */
#include <sys/wait.h> /* for wait() */
/*
|| The following declarations define a structure that controls a ring buffer.
|| 
||  rbElem_t    the type of thing that is stored in the buffer
||  RB_MAXELS   the size of the ring buffer
||  rbStruct_t  control and serialization items for the buffer
||
||  The buffer and structure are built together in an arena, and the 
||  address of the structure is the arena info (usgetinfo()).
*/
typedef long rbElem_t;  /* can be any scalar, but assumed long below */
#define RB_MAXELS 160   /* specify enough to buffer the peak data rate */
typedef struct rbStruct {
    rbElem_t * theBuffer;   /* -> [RB_MAXELS] of rbElem_t */
    usema_t * semRBdata;    /* -> semaphore for buffered data */
    usema_t * semRBspace;   /* -> semaphore for open buffer slots */
    ulock_t * lockRBupdate; /* -> lock on the following words */
    int rbGet;              /* theBuffer[rbGet] is next live data */
    int rbPut;              /* theBuffer[rbPut] is next empty slot */
} rbStruct_t;
/*
|| The following constants are default values for the global parameters.
|| See the prologs of the inputProcess and outputProcess functions.
*/
#define MAX_BURST 16 /* data rate average 25*16/2 == 200/sec */
#define FINAL_COUNT ((MAX_BURST/2)*25)*10 /* run for ~10 seconds */
#define OUTPUT_TIME 10000 /* 10 ms delay or 100 Hz */
/*
|| The following global variables are input parameters to the
|| child processes. They are set by main() from command line arguments.
*/
static int outputTimer = OUTPUT_TIME;   /* -t argument */
static int inputCount = FINAL_COUNT;    /* -c argument */
/*
|| Allocate the arena and initialize it with the ring buffer structure.
|| If any error occurs, report it and return NULL.  The following filename
|| is used. It must address a writeable directory. If the file already
|| exists, it must be writeable to this process.
*/
#define ARENA_FILE "/var/tmp/ring.buffer.arena"
usptr_t *
allocRBStuff()
{
    usptr_t * arena;
    rbStruct_t * rbs;
    int okSoFar = 1;
    /*
    || Announce that we want metering info stored with our locks.
    */
    if (-1 == usconfig(CONF_LOCKTYPE, US_DEBUGPLUS) )
    {
        perror("usconfig(CONF_LOCKTYPE)");
        return NULL;
    }
    /*
    || Create the arena.
    */
    if ( NULL == (arena = usinit(ARENA_FILE) ) )
    {
        perror("usinit");
        return NULL;
    }
    /*
    || From here on, a failure means we must call usdetach().
    */
    rbs = (rbStruct_t *)uscalloc(1, sizeof(rbStruct_t), arena);
    if (!(okSoFar=(0 != rbs)))
    {
        fprintf(stderr, "Unable to allocate anything in arena\n");
    }
    else
    {
        rbs->theBuffer = 
            (rbElem_t *)uscalloc(RB_MAXELS, sizeof(rbElem_t), arena);
        if (!(okSoFar=(0 != rbs->theBuffer)))
            fprintf(stderr, "Unable to allocate ring buffer in arena\n");
    }
    if (okSoFar)
    { /* value of semRBdata is 0 because no data in buffer yet */       
        rbs->semRBdata = usnewsema(arena, 0);
        if (!(okSoFar=(0!=rbs->semRBdata)))
            perror("usnewsema #1");
    }
    if (okSoFar)
    { /* value of semRBspace is number of empty slots in ring buffer */
        okSoFar = 0 != (rbs->semRBspace = usnewsema(arena, RB_MAXELS) );
        if (!okSoFar)
            perror("usnewsema #2");
    }
    if (okSoFar)
    {
        okSoFar = 0 != (rbs->lockRBupdate = usnewlock(arena));
        if (!okSoFar)
            perror("usnewlock");
    }
    /*
    || Set the semaphores to collect metering information.
    */
    if (okSoFar)
    {
        okSoFar =   (0==( usctlsema(rbs->semRBdata, CS_METERON) ) )
            &&  (0==( usctlsema(rbs->semRBspace, CS_METERON) ) );
        if (!okSoFar)
            perror("usctlsema(METERON)");
    }
    if (okSoFar)
    { /* stow the ring buffer structure as the arena info word */
        usputinfo(arena, (void *)rbs);
    }
    else
    { /* something went wrong, return null */
        usdetach(arena);
        arena = NULL;
    }
    return arena; 
}
/*
|| Put an item into the ring buffer.  This dePletes the count of open
|| slots, and reViVes the count of waiting data.  If the ring buffer is
|| full it blocks until getElement() has been called.  It can also
|| block briefly on the lock if another process is updating the ring.
*/
void 
putElement(rbElem_t value, rbStruct_t * rbs)
{
    uspsema(rbs->semRBspace);       /* dePlete the open slots */
    ussetlock(rbs->lockRBupdate);   /* get exclusive use of rbPut */
    rbs->theBuffer[rbs->rbPut++] = value;
    if (rbs->rbPut >= RB_MAXELS)
        rbs->rbPut = 0;
    usunsetlock(rbs->lockRBupdate); /* release use of lock */
    usvsema(rbs->semRBdata);        /* reViVe the count of active data */
}
/*
|| Fetch an item from the ring buffer. This dePletes the count of
|| waiting data items and reVives the count of open slots.  If the
|| ring buffer is empty it blocks until putElement() is called.
*/
rbElem_t getElement(rbStruct_t * rbs)
{
    rbElem_t ret;
    uspsema(rbs->semRBdata);        /* dePlete the available data */
    ussetlock(rbs->lockRBupdate);   /* get exclusive use of rbGet */
    ret = rbs->theBuffer[rbs->rbGet++];
    if (rbs->rbGet >= RB_MAXELS)
        rbs->rbPut = 0;
    usunsetlock(rbs->lockRBupdate); /* release use of lock */
    usvsema(rbs->semRBspace);       /* reViVe the count of open slots */
    return ret;
}
/*
|| This is the body of the simulated data collection process.
|| The process actually runs at a constant rate of 25 Hz, invoking
|| sginap(4) to pace itself: 100 ticks per second / 4 ticks = 25Hz.
|| However, to simulate "data" received in bursts, it "receives" from
|| 1 to MAX_BURST items per iteration, an average of MAX_BURST/2,
|| for an average data rate of (25*MAX_BURST/2) items/second.
||
|| With MAX_BURST at 16, that gives 200 items/second.
|| This is the average rate the data writers must achieve, and the ring
|| buffer has to take up the slack during long bursts.
|| 
|| At a rough approximation, the probability of a burst of length
|| n*MAX_BURST should be (1/MAX_BURST)^n.  (This means that there is
|| a nonzero probability of a burst of any length whatever, and you
|| cannot make a buffer big enough to completely preclude blockages.)
||
|| However, with MAX_BURST==16 and RB_MAXEL==160, this buffer should
|| overflow once in ~1e-12 times, provided the data writers keep to the rate.
|| 
|| The process executes until it has buffered FINAL_COUNT elements, 
|| then terminates.  main() waits for this, and shuts down the program.
*/
void
inputProcess(void *arena)
{
    rbElem_t datum;
    rbStruct_t * rbs = usgetinfo((usptr_t *)arena);
    int myPid = getpid();
    int counter = inputCount;
    int burst;
    srandom(myPid); /* seed random() */
    do
    {
        sginap(4);
        datum = (rbElem_t) random(); /* ASSUMES rbElem_t is long */
        burst = 1+(datum % MAX_BURST);
        for ( ; burst; --burst)
        {
            putElement(datum, rbs);
            --counter;
        }
    } while (counter > 0);
    /*
    || Kill time until all data has been consumed by the output procs.
    || The semaphore count is positive until all data is consumed, then
    || it becomes negative, -2, when the two output procs are waiting.
    */
    while(ustestsema(rbs->semRBdata) > -2)
    {
        sginap(10);
    }
    /* exit, ending the process and satisfying wait() in main() */
}
/*
|| This is the body of both simulated data-output processes.
|| Two instances of this code are started. The purpose of starting
|| two is merely to complicate the use of the semaphores -- it is
|| not intended to b realistic.
||
|| Each process sets a repeating itimer with an interval of OUTPUT_TIME
|| microseconds.  That constant determines the "output data rate" that
|| can be achieved.  However, due to integer truncation effects in the
|| precision timer routines, you should not expect fine-grained
|| adjustments of this value to be effective. (Not to mention the
|| interference of other processes in the system, even when this
|| program runs with a real-time priority level.)
||
|| The signal handler is empty. The POSIX sigsuspend() call is used
|| to block until the SIGALRM comes. When it comes, the empty handler
|| is called and then control returns from the sigsuspend().
|| Then one data item is fetched from the ring buffer.
||
|| When the input rate averages 200/sec, each output process needs to
|| get signals at a rate of 100/sec, or an interval of 10000 usec.
|| (Tested on an Indy, the interval had to be 2500 usec to work)
|| 
*/
void
uponSigalrm()
{
    return; /* empty handler for SIGALRM */
}
void
outputProcess(void *arena)
{
    rbStruct_t * rbs = usgetinfo((usptr_t *)arena);
    sigset_t alarmSet, emptySet;
    struct sigaction alarmAct = {SA_RESTART, uponSigalrm, 0};
    struct itimerval timer = {{0, 0}, {0, 0}};
    rbElem_t datum;
    /*
    ||  Prepare an empty set of signals to use with sigsuspend().
    */    
    sigemptyset(&emptySet);
    /*
    || Prepare a mask to block SIGALRM, and apply it.
    */
    alarmSet = emptySet;
    sigaddset(&alarmSet, SIGALRM);
    sigprocmask(SIG_BLOCK, &alarmSet, NULL);
    /*
    || Set the action for SIGALRM to the empty handler.
    */
    if (sigaction(SIGALRM, &alarmAct, NULL))
    {
        perror("sigaction");
        return;
    }
    /*
    || If a nonzero "processing time" is specified, set a repeating
    || itimer to deliver SIGALRMs regularly.
    */
    if (outputTimer)
    {
        timer.it_interval.tv_usec = outputTimer;
        timer.it_value.tv_usec = outputTimer;
        if (setitimer(ITIMER_REAL, &timer, NULL))
        {
            perror("setitimer");
            return;
        }
    }
    /*
    || Loop getting successive data items. If a nonzero processing
    || time is specified, wait for a timer pop after each one.
     */
    for (;;)
    {
        datum = getElement(rbs);
        if (outputTimer)
            sigsuspend(&emptySet);
    }
}
/*
|| Subroutine to display metering info about a lock in a more
|| compact form than usdumplock(3)
*/
void
showLockInfo(char *lockName, ulock_t *lock)
{
    lockmeter_t linfo;
    if (0==usctllock(lock,CL_METERFETCH,&linfo))
    {
        int nowaits = linfo.lm_hits - linfo.lm_spins;
        int nwpct = (100 * nowaits) / linfo.lm_hits;
        printf("Lock %s acquired %d times, %d without waiting (%d%%)\n",
               lockName,   linfo.lm_hits,   nowaits,         nwpct );
    }
    else
        printf("No metering info for lock %s\n",lockName);
}
/*
|| Subroutine to display metering info about a semaphore.
*/
void
showSemaInfo(char *semaName, usema_t *sema)
{
    semameter_t sinfo;
    if (0==usctlsema(sema,CS_METERFETCH,&sinfo))
    {
        int pct, nwait;
        printf("Metering info on sema %s\n",semaName);
        pct = (100 * sinfo.sm_phits) / sinfo.sm_psemas;
        printf("  P: %d, %d with no wait (%d%%)\n",
                 sinfo.sm_psemas, sinfo.sm_phits, pct);
        nwait = sinfo.sm_vsemas - sinfo.sm_vnowait;
        pct = (100 * nwait)/sinfo.sm_vsemas;
        printf("  V: %d, %d with P waiting (%d%%)\n",
            sinfo.sm_vsemas, nwait,        pct);
    }
    else
        printf("No metering info for sema %s\n",semaName);
}
/*
|| The main() function:
||      * Gets the arguments, if any.
||      * Sets up the arena.
||      * Starts the 3 processes.
||      * Waits for the outputProcess to terminate.
||      * Dumps the lock and semaphore info.
||      * Detaches the arena and unlinks its file.
*/
main(int argc, char**argv)
{
    pid_t kids[3];
    usptr_t * arena = allocRBStuff();
    rbStruct_t *rbs;
    int c;
    /*
    || Check that the arena and structures allocated OK.
    */
    if (!arena)
        return -1; /* allocation failed, message issued */
    rbs = usgetinfo(arena);
    /*
    || get command line arguments for input count and output delay
    */
    while (EOF != (c = getopt(argc, argv, "c:t:")))
    {
        switch (c)
        {
        case 'c':
            inputCount = atoi(optarg);
            break;
        case 't':
            outputTimer = atoi(optarg);
            break;
        case '?':
            printf("usage: [-c input data count] [-t output time usec]\n");
            return -2;
            break;
        }
    }
    /*
    || Create the inputProcess (simulated data collection).
    */
    kids[0] = sproc(inputProcess, PR_SALL, (void *)arena);
    if (-1 == kids[0])
    {
        perror("sproc(outputProcess)");
        return -1;
    }
    /*
    || Create the 2 outputProcesses (simulated data reduction).
     */
    kids[1] = sproc(outputProcess,  PR_SALL,  (void *)arena);
    if (-1 == kids[1])
    {
        perror("sproc(inputProcess 1)");
        return -1;
    }
    kids[2] = sproc(outputProcess,  PR_SALL,  (void *)arena);
    if (-1 == kids[2])
    {
        perror("sproc(inputProcess 2)");
        return -1;
    }
    /*
    || Wait until a child process (don't care which) ends.
    */
    wait(0);
    /*
    || Display the metering information from the lock and semaphores.
    */
    showLockInfo("lockRBupdate",rbs->lockRBupdate);
    showSemaInfo("semRBdata",rbs->semRBdata);
    showSemaInfo("semRBspace",rbs->semRBspace);
    /*
    || Clean up: terminate the 2 output procs (which are probably
    || blocked on semRBdata at this time).  Then detach the arena
    || and unlink its file.
    */
    kill(kids[1],SIGTERM);
    kill(kids[2],SIGTERM);
    printf("\ndetaching arena file\n");
    usdetach(arena);
    unlink(ARENA_FILE);
    return 0;
}


Next | Prev | Up | Top | Contents | Index