home *** CD-ROM | disk | FTP | other *** search
- Newsgroups: comp.sources.misc
- From: alan@tharr.UUCP (Alan Saunders)
- Subject: v25i025: QBATCH - a queued batch processing system for UNIX, Part06/06
- Message-ID: <1991Nov5.034922.5231@sparky.imd.sterling.com>
- X-Md4-Signature: b01840a9111880e2bf66690532ec6ffc
- Date: Tue, 5 Nov 1991 03:49:22 GMT
- Approved: kent@sparky.imd.sterling.com
-
- Submitted-by: alan@tharr.UUCP (Alan Saunders)
- Posting-number: Volume 25, Issue 25
- Archive-name: QBATCH/part06
- Environment: UNIX
-
- #! /bin/sh
- # This is a shell archive. Remove anything before this line, then unpack
- # it by saving it into a file and typing "sh file". To overwrite existing
- # files, type "sh file -c". You can also feed this as standard input via
- # unshar, or by typing "sh <file", e.g.. If this archive is complete, you
- # will see the following message at the end:
- # "End of archive 6 (of 6)."
- # Contents: src/qp.c
- # Wrapped by root@vfib_d on Thu Oct 31 16:36:22 1991
- PATH=/bin:/usr/bin:/usr/ucb ; export PATH
- if test -f 'src/qp.c' -a "${1}" != "-c" ; then
- echo shar: Will not clobber existing file \"'src/qp.c'\"
- else
- echo shar: Extracting \"'src/qp.c'\" \(16216 characters\)
- sed "s/^X//" >'src/qp.c' <<'END_OF_FILE'
- X/************************************************************************/
- X/* */
- X/* qp .. queue process. start a process engine for a given queue */
- X/* */
- X/* usage: qp [-r] qname */
- X/* */
- X/* Copyright (c) Vita Services 1990 */
- X/* (c) Vita Fibres 1990 1991 */
- X/* */
- X/************************************************************************/
- X
- X#include "qbatch.h"
- X#include <malloc.h>
- int fpq = 0,
- X fplck = 0,
- X fptemp = 0;
- X result,
- X interrupt = 0;
- XFILE *fpin,
- X *mon;
- struct tm *jtimes;
- char exit_message[64];
- char envflg[5], envvar[255];
- char temp[16],
- X *jcl,
- X *jcwd = NULL;
- int i,
- X j,
- X k,
- X rflag = 0;
- pid_t ppid;
- extern int had_usersig, lastflags;
- int seen_usersig;
- extern pid_t childid;
- char queue[128];
- char buff[128];
- char buff1[255];
- char path[128];
- char qbc[10];
- unsigned long q_queued, q_real, q_user, q_system;
- char qentry[12];
- char *args[10],
- X *envs[50],
- X *queuename;
- X
- char * print_time(val)
- unsigned long val;
- X{
- X int temp;
- X char sval[8];
- X static char tval[32];
- X *tval = 0;
- X temp = val/360000;
- X if (temp)
- X {
- X sprintf(sval, "%d", temp);
- X strcpy (tval, sval);
- X strcat(tval, ":");
- X val%=360000;
- X }
- X temp = val/6000;
- X if ((temp) || (*tval))
- X {
- X if (*tval) sprintf(sval, "%02d", temp);
- X else sprintf(sval, "%d", temp);
- X strcat (tval, sval);
- X strcat (tval, ":");
- X val%=6000;
- X }
- X temp = val/100;
- X if (*tval) sprintf(sval, "%02d", temp);
- X else sprintf(sval, "%d", temp);
- X strcat (tval, sval);
- X strcat (tval, ".");
- X temp = val%100;
- X sprintf(sval, "%02d", temp);
- X strcat (tval, sval);
- X return(tval);
- X}
- void handle_flags ()
- X{
- X int oldaction, newaction;
- X had_usersig ++;
- X fptemp = open (queue, O_RDONLY);
- X read (fptemp, &head, sizeof (head)); /* Get header */
- X close (fptemp);
- X if ((lastflags&qh_action) == (head.qh_flags&qh_action))
- X {
- X lastflags=head.qh_flags;
- X return;
- X }
- X oldaction = lastflags & qh_action;
- X newaction = head.qh_flags & qh_action;
- X if ((newaction & qh_kill) != 0)
- X {
- X if (childid)
- X {
- X kill_child_group();
- X }
- X head.qh_flags -= qh_kill;
- X }
- X if ((newaction & qh_halt) != (oldaction & qh_halt))
- X {
- X if (childid)
- X {
- X toggle_halt_status(newaction);
- X }
- X }
- X lastflags = head.qh_flags;
- X}
- X
- main (argc, argv, envp)
- int argc;
- char *argv[],
- X *envp[];
- X
- X{
- X if (getuid() != 0)
- X {
- X fprintf (stderr, "Must be root to process a queue\n");
- X exit (-1);
- X }
- X
- X if (argc < 2)
- X {
- X fprintf (stderr, "Usage qp <qname>\n");
- X qb_term (-1);
- X }
- X if (strcmp(argv[1], "-v") == 0) q_version();
- X queuename = argv[1];
- X strcpy (queue, QUEUEPATH);
- X strcat (queue, queuename);
- X fpq = open (queue, O_RDWR);
- X if (fpq == -1)
- X {
- X perror ("cannot access queue ");
- X qb_term (-1);
- X }
- X qb_setterm();
- X q_lock(fpq);
- X read (fpq, &head, sizeof (head));
- X if (bad_queue()) qb_exit(-1);
- X if (head.qh_pid != 0)
- X {
- X strcpy(buff, QUEUEPATH);
- X strcat(buff, ".");
- X strcat(buff, queuename);
- X strcat(buff, ".lck");
- X fplck = open (buff, O_RDWR+O_CREAT, 0644);
- X if (fplck == -1)
- X {
- X fprintf (stderr, "Cannot open queue lockfile!\n");
- X qb_exit(-1);
- X }
- X if (q_islock(fplck))
- X {
- X /* queue lock is active, let's just check the pid */
- X read (fplck, &childid, sizeof(childid));
- X if (childid != head.qh_pid)
- X {
- X fprintf (stderr, "You have a problem!!\n");
- X fprintf (stderr, "A lock is held on the queue lock file : %s ", buff);
- X fprintf(stderr, "by an activity (%d)\n", childid);
- X fprintf (stderr, "Which does not match that of the qp engine in ");
- X fprintf (stderr, "the queue header (%d)\n", head.qh_pid);
- X qb_exit(-1);
- X }
- X printf ("%s process activity already running .. pid = %d\n", queuename, head.qh_pid);
- X if (head.qh_flags & qh_stop)
- X {
- X printf ("But stop flag is set .. do you wish to clear it (y/n)?");
- X i = getchar ();
- X if (i == 'y' || i == 'Y')
- X {
- X head.qh_flags ^= qh_stop;
- X lseek (fpq, 0, SEEK_SET);
- X write (fpq, &head, sizeof (head));
- X }
- X }
- X q_unlock(fpq);
- X close (fpq);
- X fpq = 0;
- X qb_resetterm();
- X exit (0);
- X }
- X else q_unlock(fplck);
- X *buff = 0;
- X close (fplck);
- X fplck = 0;
- X }
- X q_unlock(fpq);
- X close (fpq);
- X fpq = 0;
- X/*
- X We are in a position to actually process the queue, so to free the
- X controlling vdu, we'll fork into the background, and terminate the
- X foreground
- X*/
- X ppid = fork ();
- X if (ppid == -1)
- X {
- X perror ("Cannot create process activity! ");
- X qb_resetterm();
- X exit (-1);
- X }
- X if (ppid > 0)
- X {
- X printf ("%s process activity running .. pid = %d\n", queuename, ppid);
- X qb_resetterm();
- X exit (0);
- X }
- X
- X/* Now we're in the background .. we can get to work! */
- X
- X qb_setterm();
- X fpq = open (queue, O_RDWR);
- X if (fpq == -1) perror ("can't open queue");
- X q_lock(fpq);
- X result = read (fpq, &head, sizeof (head));
- X if (result == -1) perror ("can't read queue");
- X ppid = getpid();
- X /* set up system queue lock */
- X strcpy(buff, QUEUEPATH);
- X strcat(buff, ".");
- X strcat(buff, queuename);
- X strcat(buff, ".lck");
- X fplck = open (buff, O_RDWR + O_CREAT, 0644);
- X q_lock(fplck);
- X write (fplck, &ppid, sizeof(ppid));
- X fsync(fplck);
- X head.qh_pid = ppid; /* set up header data .. */
- X head.qh_flags &= (qh_enabled+qh_fixed); /* ,..flags */
- X head.qh_proc = time ((time_t *) 0); /* set up start time */
- X head.qh_queued = 0;
- X head.qh_real = 0;
- X head.qh_user = 0;
- X head.qh_system = 0;
- X head.qh_jobcount = 0;
- X result = lseek (fpq, 0, SEEK_SET);
- X if (result == -1) perror ("can't lseek queue");
- X result = write (fpq, &head, sizeof (head)); /* and write it away */
- X if (result == -1) perror ("can't write queue");
- X q_unlock(fpq);
- X result = close (fpq);
- X if (result == -1) perror ("can't close queue");
- X fpq = 0;
- X qb_setuser(handle_flags);
- X while (1)
- X { /* and enter main loop */
- X/*
- X don't bother locking here, we're only checking status
- X*/
- X fpq = open (queue, O_RDWR);
- X read (fpq, &head, sizeof (head));
- X if (head.qh_noentries == 0)
- X { /* nothing to do? */
- X while ((head.qh_noentries == 0)
- X ||(head.qh_flags&qh_halt != 0))
- X { /* wait until there is */
- X close (fpq);
- X fpq = 0;
- X if (! had_usersig) qb_pause(); /* wait for a signal from a support program */
- X else had_usersig --;
- X if (lastflags & qh_stop) /* stop flag set */
- X {
- X printf ("%s process activity Stopped .. pid was %d\n", queuename, ppid);
- X fpq = open (queue, O_RDWR);
- X q_lock(fpq);
- X read (fpq, &head, sizeof (head));
- X lseek (fpq, 0, SEEK_SET);
- X head.qh_pid = 0;
- X write (fpq, &head, sizeof (head));
- X q_unlock(fpq);
- X close (fpq); fpq = 0;
- X q_unlock(fplck);
- X close(fplck);
- X qb_term (0);
- X }
- X fpq = open (queue, O_RDWR);
- X read (fpq, &head, sizeof (head));
- X }
- X }
- X close (fpq);
- X fpq = 0;
- X/*
- X we should only get here if there's a job in the queue
- X*/
- X fpq = open (queue, O_RDWR);
- X q_lock(fpq);
- X read (fpq, &head, sizeof (head));
- X/*
- X just in case a cancel managed to get in while queue unlocked.
- X*/
- X if (head.qh_noentries == 0)
- X {
- X q_unlock(fpq);
- X close (fpq);
- X fpq = 0;
- X }
- X else
- X { /* get entry and fork it */
- X read (fpq, &entry, sizeof (entry));
- X if (entry.qe_monitor[0] != 0)
- X {
- X mon = fopen (entry.qe_monitor, "a");
- X }
- X else
- X {
- X mon = fopen (head.qh_defmon, "a");
- X }
- X head.qh_start = time ((time_t *) 0); /* set up start time */
- X q_queued = head.qh_start - entry.qe_submitted;
- X jtimes = (struct tm *)localtime (&head.qh_start);
- X fprintf (mon, "##\n## Queue %s entry %d (uid = %d : %s) Job: %s\n"
- X ,queuename, entry.qe_jobno, entry.qe_uid, entry.qe_uname, entry.qe_jobname);
- X fprintf (mon, "## Started %s##\n" ,asctime (jtimes));
- X fclose (mon);
- X entry.qe_repcount++; /* increment count */
- X/*
- X set up path to jcl
- X*/
- X strcpy (path, head.qh_spool);
- X if (path[strlen (path) - 1] != '/')
- X strcat (path, "/");
- X strcat (path, entry.qe_jcl);
- X if (access (path, F_OK) == -1)
- X {
- X perror ("Can't access jcl ");
- X if (head.qh_flags &qh_repeat != 0)
- X head.qh_flags -= qh_repeat;
- X }
- X else
- X {
- X qb_set_timer();
- X childid = fork ();
- X if (childid == 0)
- X {
- X/*
- X fork returned 0 .. must be the child
- X*/
- X qb_setpgrp (); /* create new process
- X group for child */
- X fpin = fopen(path, "r");
- X fgets (buff, 128, fpin);
- X if ((buff[0] == '#') && (buff[1] == '!') && (buff[2] == 'L'))
- X {
- X /* job is linked, jcl is actually the file pointed to */
- X fclose (fpin);
- X strcpy (path, &buff[3]);
- X fpin = fopen (path, "r");
- X if (fpin == NULL)
- X fprintf (stderr, "Cannot access linked jcl\n");
- X exit (-1);
- X }
- X i = 2, j = 0;
- X while (1) /* parse strings in interpreter line */
- X {
- X args[j] = buff+i;
- X for (;((buff[i] != ' ') && (buff[i] != '\n')); i++){};
- X k = buff[i];
- X buff[i] = 0;
- X i++;
- X j++;
- X if (k == '\n') break;
- X }
- X args[j++] = path;
- X args[j] = (char *) NULL;
- X strcpy (queue, "QUEUE=");
- X strcat (queue, queuename);
- X for (j=0; j< 50; j++) envs[j] = (char *) NULL;
- X j = 0;
- X while (1) /* transfer environment variables (up to 46) */
- X {
- X fgets (buff1, 255, fpin);
- X sscanf (buff1, "%s %s", envflg, envvar);
- X if (strcmp (envflg, "#ENV")) break; /* no more */
- X envs [j] = malloc(strlen(envvar)+1);
- X strcpy (envs[j++], envvar);
- X if (j > 45) break; /* can't fit any more in table */
- X }
- X fclose (fpin);
- X envs[j++] = queue;
- X sprintf (qbc, "QBC=%u", entry.qe_repcount);
- X envs[j++] = qbc;
- X sprintf (qentry, "QENTRY=%d", entry.qe_jobno);
- X envs[j++] = qentry;
- X j = 0;
- X jtimes = (struct tm *)localtime (&head.qh_start);
- X if (entry.qe_monitor[0] != 0)
- X {
- X mon = freopen (entry.qe_monitor, "a", stdout);
- X mon = freopen (entry.qe_monitor, "a", stderr);
- X }
- X else
- X {
- X mon = freopen (head.qh_defmon, "a", stdout);
- X mon = freopen (head.qh_defmon, "a", stderr);
- X }
- X if (setuid (entry.qe_uid) != 0)
- X {
- X perror ("cannot change uid!!");
- X exit (-1);
- X }
- X setgid (entry.qe_gid);
- X nice (head.qh_priority); /* set priority */
- X if (execve (args[0], args, envs) == -1)
- X {
- X perror ("cannot execute job! ");
- X if (head.qh_flags &qh_repeat != 0)
- X head.qh_flags -= qh_repeat;
- X exit (-1);
- X }
- X }
- X if (childid == -1)
- X {
- X perror ("cannot execute job! ");
- X if (head.qh_flags &qh_repeat != 0)
- X head.qh_flags -= qh_repeat;
- X }
- X/*
- X set child on track, now update queue and wait for it to finish
- X*/
- X head.qh_flags &= (qh_enabled+qh_fixed); /* ,..flags */
- X entry.qe_status = childid; /* and entry status */
- X lseek (fpq, 0, SEEK_SET);
- X head.qh_pid = ppid;
- X write (fpq, &head, sizeof (head)); /* and write them away */
- X write (fpq, &entry, sizeof (entry));
- X q_unlock(fpq);
- X close (fpq);
- X fpq = 0;
- X
- X q_wait(exit_message);
- X qb_get_timer(&q_real, &q_user, &q_system);
- X childid = 0;
- X if (entry.qe_monitor[0] != 0)
- X {
- X mon = fopen (entry.qe_monitor, "a");
- X }
- X else
- X {
- X mon = fopen (head.qh_defmon, "a");
- X }
- X head.qh_start = time ((time_t *) 0); /* set up end time */
- X jtimes = (struct tm *)localtime (&head.qh_start);
- X fprintf (mon, "##\n## Queue %s entry %d (uid = %d : %s) Job: %s\n"
- X ,queuename, entry.qe_jobno, entry.qe_uid, entry.qe_uname, entry.qe_jobname);
- X fprintf (mon, "## stopped %s" ,asctime (jtimes));
- X fprintf(mon, "## Times:- Queued: %15s", print_time(q_queued));
- X fprintf(mon, " Real: %15s\n", print_time(q_real));
- X fprintf(mon, "## User: %15s", print_time(q_user));
- X fprintf(mon, " System: %15s\n", print_time(q_system));
- X if (exit_message != NULL)
- X fprintf (mon, "## (%s)\n", exit_message);
- X fprintf (mon,"##\n");
- X fclose (mon);
- X }
- X if (entry.qe_notify > 0)
- X {
- X /* notify user */
- X sprintf (buff, "/usr/bin/bv/jobdone %s %d %d %s %s \"%s\"", queuename, entry.qe_jobno, entry.qe_notify, entry.qe_uname, entry.qe_tty, entry.qe_jobname);
- X system (buff);
- X }
- X fpq = open (queue, O_RDWR);
- X q_lock(fpq);
- X read (fpq, &head, sizeof(head));
- X head.qh_start = 0;
- X head.qh_queued += (q_queued*100);
- X head.qh_real += q_real;
- X head.qh_user += q_user;
- X head.qh_system += q_system;
- X head.qh_jobcount ++;
- X if ((head.qh_flags & qh_repeat) == 0)
- X {
- X head.qh_noentries--;
- X/*
- X get rid of first entry in queue
- X*/
- X lseek (fpq, 0, SEEK_SET);
- X head.qh_pid = ppid;
- X write (fpq, &head, sizeof (head));
- X if (head.qh_noentries > 0)
- X {
- X for (i = 0; i < head.qh_noentries; i++)
- X {
- X lseek (fpq, (sizeof (head) + ((i + 1) * sizeof (entry))), SEEK_SET);
- X read (fpq, &entry, sizeof (entry));
- X lseek (fpq, (sizeof (head) + (i * sizeof (entry))), SEEK_SET);
- X write (fpq, &entry, sizeof (entry));
- X }
- X }
- X ftruncate (fpq, sizeof (head) + head.qh_noentries * sizeof (entry));
- X/*
- X get rid of jcl file
- X*/
- X chmod (path, 0755);
- X unlink (path);
- X }
- X else
- X {
- X read (fpq, &entry, sizeof(entry));
- X lseek (fpq, 0, SEEK_SET);
- X head.qh_pid = ppid;
- X write (fpq, &head, sizeof (head));
- X entry.qe_status = 0;
- X write (fpq, &entry, sizeof(entry));
- X }
- X if (head.qh_flags & qh_stop)
- X {
- X printf ("%s process activity stopped .. pid was %d\n", queuename, ppid);
- X lseek (fpq, 0, SEEK_SET);
- X head.qh_pid = 0;
- X write (fpq, &head, sizeof (head));
- X qb_term (0);
- X }
- X q_unlock(fpq);
- X close (fpq);
- X fpq = 0;
- X }
- X }
- X}
- END_OF_FILE
- if test 16216 -ne `wc -c <'src/qp.c'`; then
- echo shar: \"'src/qp.c'\" unpacked with wrong size!
- fi
- # end of 'src/qp.c'
- fi
- echo shar: End of archive 6 \(of 6\).
- cp /dev/null ark6isdone
- MISSING=""
- for I in 1 2 3 4 5 6 ; do
- if test ! -f ark${I}isdone ; then
- MISSING="${MISSING} ${I}"
- fi
- done
- if test "${MISSING}" = "" ; then
- echo You have unpacked all 6 archives.
- rm -f ark[1-9]isdone
- else
- echo You still need to unpack the following archives:
- echo " " ${MISSING}
- fi
- ## End of shell archive.
- exit 0
-
- exit 0 # Just in case...
- --
- Kent Landfield INTERNET: kent@sparky.IMD.Sterling.COM
- Sterling Software, IMD UUCP: uunet!sparky!kent
- Phone: (402) 291-8300 FAX: (402) 291-4362
- Please send comp.sources.misc-related mail to kent@uunet.uu.net.
-