home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
The World of Computer Software
/
World_Of_Computer_Software-02-385-Vol-1of3.iso
/
c
/
condor40.zip
/
CONDOR
/
src
/
condor_shadow
/
shadow.c
< prev
next >
Wrap
C/C++ Source or Header
|
1989-09-06
|
30KB
|
1,190 lines
/*
** Copyright 1986, 1987, 1988, 1989 University of Wisconsin
**
** Permission to use, copy, modify, and distribute this software and its
** documentation for any purpose and without fee is hereby granted,
** provided that the above copyright notice appear in all copies and that
** both that copyright notice and this permission notice appear in
** supporting documentation, and that the name of the University of
** Wisconsin not be used in advertising or publicity pertaining to
** distribution of the software without specific, written prior
** permission. The University of Wisconsin makes no representations about
** the suitability of this software for any purpose. It is provided "as
** is" without express or implied warranty.
**
** THE UNIVERSITY OF WISCONSIN DISCLAIMS ALL WARRANTIES WITH REGARD TO
** THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
** FITNESS. IN NO EVENT SHALL THE UNIVERSITY OF WISCONSIN BE LIABLE FOR
** ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
** WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
** ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
** OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
**
** Authors: Allan Bricker and Michael J. Litzkow,
** University of Wisconsin, Computer Sciences Dept.
**
*/
#include <stdio.h>
#include <signal.h>
#include <pwd.h>
#include <netdb.h>
#include <syscall.h>
#include <ctype.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <sys/buf.h>
#include <sys/stat.h>
#include <sys/errno.h>
#include <sys/file.h>
#include <sys/time.h>
#include <sys/param.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <sys/resource.h>
#include <rpc/types.h>
#include <rpc/xdr.h>
#include "sched.h"
#include "debug.h"
#include "except.h"
#include "trace.h"
#include "fileno.h"
#include "files.h"
#include "proc.h"
#include "expr.h"
#include "exit.h"
#include "condor_types.h"
#include "clib.h"
#ifdef NDBM
#include <ndbm.h>
#else NDBM
#include "ndbm_fake.h"
#endif NDBM
static char *_FileName_ = __FILE__; /* Used by EXCEPT (see except.h) */
char *getenv(), *index(), *strcat(), *strcpy(), *param();
XDR *xdr_Init();
EXPR *scan(), *create_expr();
ELEM *create_elem();
CONTEXT *create_context(), *build_context();
FILE *popen();
extern int LockFd;
extern char MsgBuf[];
int whoami();
int MyPid;
int MyPipe;
int LogPipe;
PROC Proc;
char *Spool;
char QueueName[MAXPATHLEN];
DBM *QueueD = NULL, *OpenJobQueue();
#define OPENJOBQUEUE(q, name, flags, mode) { \
if( (q) == NULL ) { \
(q) = OpenJobQueue(name, flags, mode); \
if( (q) == NULL ) { \
EXCEPT("OpenJobQueue(%s)", name); \
} \
} \
}
#define CLOSEJOBQUEUE(q) { CloseJobQueue(q); (q) = NULL; }
int CondorGid;
int CondorUid;
int ClientUid;
int ClientGid;
struct rusage LocalUsage;
struct rusage RemoteUsage;
char CkptName[ MAXPATHLEN ]; /* The name of the ckpt file */
char ICkptName[ MAXPATHLEN ]; /* The name of the initial ckpt file */
char RCkptName[ MAXPATHLEN ]; /* The name of the returned ckpt file */
char TmpCkptName[ MAXPATHLEN ]; /* The name of the temp ckpt file */
char *sprintf();
#ifdef NOTDEF
int InitialJobStatus = -1;
#endif NOTDEF
union wait JobStatus;
struct rusage JobRusage;
int ChildPid;
int DoCleanup();
XDR *xdr_RSC, *RSC_ShadowInit();
usage()
{
dprintf( D_ALWAYS, "Usage: shadow host cluster proc\n" );
exit( 1 );
}
/*ARGSUSED*/
main(argc,argv,envp)
int argc;
char *argv[];
char *envp[];
{
int p[2];
int log[2];
_EXCEPT_Cleanup = DoCleanup;
MyPid = getpid();
config( argv[0], (CONTEXT *)0 );
dprintf_config( "SHADOW", SHADOW_LOG );
DebugId = whoami;
dprintf( D_ALWAYS, "********** Shadow Parent starting up **********\n" );
dprintf( D_ALWAYS, "Server = %s\n", argv[1] );
if( (argc != 4) && (argc != 5) )
usage();
if( setreuid(0,0) < 0 ) {
EXCEPT("setreuid(0,0)");
}
Spool = param( "SPOOL" );
if( Spool == NULL ) {
EXCEPT( "Spool directory not specified in config file" );
}
(void)sprintf( QueueName, "%s/job_queue", Spool );
start_job( argv[1], argv[2], argv[3] );
if( pipe(p) < 0 ) {
EXCEPT("Could not establish pipe");
}
if( pipe(log) < 0 ) {
EXCEPT( "Could not extablish log pipe");
}
/* Don't want to share log file lock between child and pnarent */
(void)close( LockFd );
LockFd = -1;
ChildPid = fork();
if( ChildPid < 0 ) {
EXCEPT("fork");
}
if( ChildPid != 0 ) { /* Parent */
/*
** dprintf( D_ALWAYS, "p[0] = %d, p[1] = %d\n", p[0], p[1] );
** dprintf( D_ALWAYS, "log[0] = %d, log[1] = %d\n", log[0], log[1] );
*/
(void)close( log[1] );
(void)close( p[1] );
ParentShadow( p[0], log[0] );
} else { /* Child */
_EXCEPT_Cleanup = NULL; /* Parent will do any needed cleanup */
(void)close( log[0] );
(void)close( p[0] );
MyPipe = p[1];
LogPipe = log[1];
MyPid = getpid();
DebugLock = NULL;
DebugFile = NULL;
DebugFP = fdopen( log[1], "w" );
dprintf( D_ALWAYS, "********** Shadow Child starting up **********\n" );
HandleSyscalls( p[1] );
}
}
HandleSyscalls( pipe )
int pipe;
{
if( setreuid(0,0) < 0 ) {
EXCEPT("setreuid(0,0)");
}
dprintf(D_FULLDEBUG, "HandleSyscalls: about to chdir(%s)\n", Proc.rootdir);
if( chdir(Proc.rootdir) < 0 ) {
PERM_ERR( "Can't access \"%s\"", Proc.rootdir );
EXCEPT("chdir(%s)", Proc.rootdir);
}
dprintf(D_FULLDEBUG, "HandleSyscalls: about to chroot(%s)\n", Proc.rootdir);
if( chroot(Proc.rootdir) < 0 ) {
PERM_ERR( "Can't access \"%s\"", Proc.rootdir );
EXCEPT("chroot(%s)", Proc.rootdir);
}
/*
** Set up group array for job owner, but include Condor's gid
*/
/*
** dprintf(D_ALWAYS,"Shadow: about to initgroups(%s, %d)\n",
** Proc.owner,CondorGid);
*/
if( initgroups(Proc.owner, CondorGid) < 0 ) {
EXCEPT("Can't initgroups(%s, %d)", Proc.owner, CondorGid);
}
/*
** dprintf(D_ALWAYS,"Shadow: about to setrgid(%d)\n", ClientGid);
*/
/* Set the rgid to job's owner - keep him out of trouble */
if( setrgid(ClientGid) < 0 ) {
EXCEPT( "setrgid(%d)", ClientGid);
}
/*
dprintf(D_ALWAYS,"Shadow: about to setruid(%d,%d)\n", ClientUid,ClientUid );
*/
/* Set both ruid and euid to job's owner - keep him out of trouble */
if( setreuid(ClientUid,ClientUid) < 0 ) {
EXCEPT( "setreuid(%d, %d)", ClientUid, ClientUid );
}
dprintf(D_FULLDEBUG, "HandleSyscalls: about to chdir(%s)\n", Proc.iwd);
if( chdir(Proc.iwd) < 0 ) {
PERM_ERR( "Can't access \"%s\"", Proc.iwd );
EXCEPT("chdir(%s)", Proc.iwd);
}
dprintf(D_SYSCALLS, "Shadow: Starting to field syscall requests\n");
errno = 0;
open_std_files( &Proc );
for(;;) { /* get a request and fulfill it */
if( do_REMOTE_syscall() < 0 ) {
dprintf(D_SYSCALLS, "Shadow: do_REMOTE_syscall returned < 0\n");
break;
}
}
{
int s;
errno = 0;
if( (s=write(pipe, (char *)&JobStatus, sizeof(JobStatus))) !=
sizeof(JobStatus) ) {
dprintf( D_ALWAYS, "pipe = %d, status = %d\n", pipe, s );
EXCEPT("Could not communicate JobStatus to the parent shadow");
}
}
if( write(pipe, (char *)&JobRusage, sizeof(JobRusage)) !=
sizeof(JobRusage) ) {
EXCEPT("Could not communicate JobRusage to the parent shadow");
}
dprintf(D_ALWAYS,
"Shadow: Job %d.%d exited, termsig = %d, coredump = %d, retcode = %d\n",
Proc.id.cluster, Proc.id.proc, JobStatus.w_termsig,
JobStatus.w_coredump, JobStatus.w_retcode );
if( JobStatus.w_termsig == 0 ) {
dprintf( D_ALWAYS, "********** Shadow Child Exiting (JOB_EXITED)\n" );
exit( JOB_EXITED );
}
/*
** The checkpoint code will cause the job to exit with a SIGQUIT.
*/
/*
** We should put a description of the situation into
** (chrooted) /tmp/Shadow.status.pid to be mailed to the
** user later on.
*/
if( JobStatus.w_termsig != SIGQUIT ) {
if( JobStatus.w_coredump ) {
char cwd[ MAXPATHLEN ];
int blen;
if( getwd(cwd) == NULL ) {
EXCEPT("getwd returned '%s'", cwd);
}
blen = strlen(cwd) + 1;
if( blen == 2 ) { /* cwd == "/". Send back a null string */
cwd[0] = '\0';
blen = 1;
}
if( write(pipe, cwd, blen) != blen ) {
EXCEPT("Could not send core directory to the parent shadow");
}
dprintf( D_ALWAYS,
"********** Shadow Child Exiting (JOB_COREDUMPED)\n" );
exit( JOB_COREDUMPED );
} else {
dprintf( D_ALWAYS,
"********** Shadow Child Exiting (JOB_KILLED)\n" );
exit( JOB_KILLED );
}
}
dprintf(D_ALWAYS, "Shadow: Job %d.%d exited, new checkpoint was made.\n",
Proc.id.cluster, Proc.id.proc );
dprintf( D_ALWAYS, "********** Shadow Child Exiting (JOB_CKPTED)\n" );
exit( JOB_CKPTED );
}
ParentShadow(pipe, log)
int pipe;
int log;
{
union wait status;
struct rusage local_rusage;
char notification[ BUFSIZ ];
int pid;
int got_status = 0;
int got_rusage = 0;
fd_set readfds, template;
int cnt;
int nfds;
FILE *log_fp;
int rm();
int s;
char coredir[ MAXPATHLEN ];
int core_needed;
(void) signal(SIGTERM, rm);
if( (log_fp=fdopen(log,"r")) == NULL ) {
EXCEPT( "fdopen(%d,'r')", log );
}
dprintf(D_FULLDEBUG, "Shadow: waiting for JobStatus from %d\n", ChildPid);
nfds = MAX( pipe, log) + 1;
FD_ZERO( &template );
FD_SET( pipe, &template );
FD_SET( log, &template );
while( nfds ) {
readfds = template;
cnt = select( nfds, (int *)&readfds, (int *)0, (int *)0,
(struct timeval *)0 );
if( cnt < 0 ) {
EXCEPT( "Select(%d,0x%x,0,0,0)", nfds, &readfds );
}
if( log >= 0 && FD_ISSET(log,&readfds) ) {
if( HandleChildLog(log_fp) < 0 ) {
FD_CLR(log,&template);
(void)close( log );
log = -1;
nfds = pipe + 1;
}
continue;
}
errno = 0;
if( !got_status ) { /* get the job's exit status */
if( (s=read(pipe, (char *)&JobStatus, sizeof(JobStatus))) !=
sizeof(JobStatus) ) {
dprintf( D_ALWAYS, "pipe = %d, status is %d\n", pipe, s );
dprintf( D_ALWAYS,
"Could not communicate JobStatus from the child shadow");
break;
}
if( JobStatus.w_coredump ) {
core_needed = 1;
} else {
core_needed = 0;
}
got_status = 1;
dprintf(D_FULLDEBUG, "Shadow: waiting for JobRusage from %d\n",
ChildPid);
} else if( !got_rusage ) { /* get the rusage */
if( read(pipe, (char *)&JobRusage, sizeof(JobRusage)) !=
sizeof(JobRusage) ) {
EXCEPT("Could not communicate JobRusage from the child shadow");
}
got_rusage = 1;
if( !core_needed ) {
FD_CLR( pipe, &template );
(void)close( pipe );
pipe = -1;
nfds = MAX( pipe, log) + 1;
}
} else { /* get the name of the core directory */
if( read(pipe, coredir, sizeof(coredir)) < 0 ) {
EXCEPT("Could not read core directory");
}
FD_CLR( pipe, &template );
(void)close( pipe );
pipe = -1;
nfds = MAX( pipe, log) + 1;
}
}
dprintf(D_FULLDEBUG, "Shadow: waiting for %d to exit\n", ChildPid);
pid = wait3(&status, 0, &local_rusage);
if( pid < 0 ) {
EXCEPT("wait3");
}
if( pid != ChildPid ) {
EXCEPT("wait3 returned pid %d (not %d)", pid, ChildPid);
}
dprintf(D_FULLDEBUG, "Shadow: Pid %d returned by wait.\n", pid );
notification[0] = '\0';
if( got_status ) {
handle_termination( notification, coredir );
} else if( MsgBuf[0] ) {/* Child shadow perm err, kill job & notify owner */
Proc.status = COMPLETED;
dprintf( D_ALWAYS, "MsgBuf = \"%s\"\n", MsgBuf );
(void)strcpy( notification, MsgBuf );
} else { /* Unexplained death of child shadow, assume temp error */
dprintf(D_ALWAYS, "Child shadow died, no msg, assuming temp error\n" );
DoCleanup();
}
update_job_status( &local_rusage, &JobRusage );
if( notification[0] ) {
NotifyUser( notification );
}
dprintf( D_ALWAYS, "********** Shadow Parent Exiting **********\n" );
exit( 0 );
}
handle_termination( notification, coredir )
char *notification;
char *coredir;
{
switch( JobStatus.w_termsig ) {
case 0: /* If core, bad executable -- otherwise a normal exit */
if( JobStatus.w_coredump && JobStatus.w_retcode == ENOEXEC ) {
(void)sprintf( notification, "Job file not executable" );
dprintf( D_ALWAYS, "Shadow: Job file not executable" );
} else {
(void)sprintf(notification, "Remote Unix Job exited with status %d",
JobStatus.w_retcode );
dprintf(D_ALWAYS, "Shadow: Job exited normally with status %d\n",
JobStatus.w_retcode );
}
Proc.status = COMPLETED;
break;
case SIGKILL: /* Kicked off without a checkpoint */
dprintf(D_ALWAYS, "Shadow: Job was kicked off without a checkpoint\n" );
DoCleanup();
break;
case SIGQUIT: /* Kicked off, but with a checkpoint */
dprintf(D_ALWAYS, "Shadow: Job was checkpointed\n" );
if( strcmp(Proc.rootdir, "/") != 0 ) {
MvTmpCkpt();
}
Proc.status = IDLE;
break;
default: /* Job exited abnormally */
if( JobStatus.w_coredump ) {
if( strcmp(Proc.rootdir, "/") == 0 ) {
(void)sprintf(notification,
"Remote Unix Job %d.%d was killed by signal %d\nCore file is %s/core.%d.%d",
Proc.id.cluster, Proc.id.proc, JobStatus.w_termsig,
coredir, Proc.id.cluster, Proc.id.proc);
} else {
(void)sprintf(notification,
"Remote Unix Job %d.%d was killed by signal %d\nCore file is %s%s/core.%d.%d",
Proc.id.cluster, Proc.id.proc, JobStatus.w_termsig,
Proc.rootdir, coredir, Proc.id.cluster, Proc.id.proc);
}
} else {
(void)sprintf(notification,
"Remote Unix Job %d.%d was killed by signal %d",
Proc.id.cluster, Proc.id.proc, JobStatus.w_termsig);
}
dprintf(D_ALWAYS, "Shadow: %s\n", notification);
Proc.status = COMPLETED;
break;
}
}
char *
d_format_time( dsecs )
double dsecs;
{
int days, hours, minutes, secs;
static char answer[25];
#define SECONDS 1
#define MINUTES (60 * SECONDS)
#define HOURS (60 * MINUTES)
#define DAYS (24 * HOURS)
secs = dsecs;
days = secs / DAYS;
secs %= DAYS;
hours = secs / HOURS;
secs %= HOURS;
minutes = secs / MINUTES;
secs %= MINUTES;
(void)sprintf(answer, "%3d %02d:%02d:%02d", days, hours, minutes, secs);
return( answer );
}
NotifyUser( buf )
char *buf;
{
FILE *mailer;
char cmd[ BUFSIZ ];
double rutime, rstime, lutime, lstime; /* remote/local user/sys times */
double trtime, tltime; /* Total remote/local time */
switch( Proc.notification ) {
case NOTIFY_NEVER:
return;
case NOTIFY_ALWAYS:
break;
case NOTIFY_COMPLETE:
if( Proc.status == COMPLETED ) {
break;
} else {
return;
}
case NOTIFY_ERROR:
if( (Proc.status == COMPLETED) && (JobStatus.w_retcode != 0) ) {
break;
} else {
return;
}
default:
dprintf(D_ALWAYS, "Condor Job %d.%d has a notification of %d\n",
Proc.id.cluster, Proc.id.proc, Proc.notification );
}
(void)sprintf(cmd, "/bin/mail %s", Proc.owner );
mailer = popen( cmd, "w" );
if( mailer == NULL ) {
EXCEPT("Shadow: Cannot do popen(<%s>, <w>", cmd);
}
fprintf(mailer, "From: Condor\n" );
fprintf(mailer, "To: %s\n", Proc.owner );
fprintf(mailer, "Subject: Condor Job %d.%d\n\n",
Proc.id.cluster, Proc.id.proc );
fprintf(mailer, "%s\n\n", buf );
fprintf(mailer, "Submitted at: %s", ctime( (time_t *)&Proc.q_date) );
rutime = Proc.remote_usage.ru_utime.tv_sec;
rstime = Proc.remote_usage.ru_stime.tv_sec;
trtime = rutime + rstime;
lutime = Proc.local_usage.ru_utime.tv_sec;
lstime = Proc.local_usage.ru_stime.tv_sec;
tltime = lutime + lstime;
fprintf(mailer, "Remote User Time: %s\n", d_format_time(rutime) );
fprintf(mailer, "Remote System Time: %s\n", d_format_time(rstime) );
fprintf(mailer, "Total Remote Time: %s\n\n", d_format_time(trtime));
fprintf(mailer, "Local User Time: %s\n", d_format_time(lutime) );
fprintf(mailer, "Local System Time: %s\n", d_format_time(lstime) );
fprintf(mailer, "Total Local Time: %s\n\n", d_format_time(tltime));
if( tltime >= 1.0 ) {
fprintf(mailer, "Leveraging Factor: %2.1f\n", trtime / tltime );
}
(void)pclose( mailer );
}
update_job_status( localp, remotep )
struct rusage *localp, *remotep;
{
PROC proc;
OPENJOBQUEUE(QueueD, QueueName, O_RDWR, 0);
LockJobQueue( QueueD, WRITER );
proc.id.cluster = Proc.id.cluster;
proc.id.proc = Proc.id.proc;
if( FetchProc(QueueD,&proc) < 0 ) {
EXCEPT( "Shadow: FetchProc(%d.%d)", proc.id.cluster, proc.id.proc );
}
if( proc.status == REMOVED ) {
dprintf( D_ALWAYS, "Job %d.%d has been removed by condor_rm\n",
proc.id.cluster, proc.id.proc );
if( CkptName[0] != '\0' ) {
dprintf(D_ALWAYS, "Shadow: unlinking Ckpt '%s'\n", CkptName);
(void) unlink( CkptName );
}
if( TmpCkptName[0] != '\0' ) {
dprintf(D_ALWAYS, "Shadow: unlinking TmpCkpt '%s'\n", TmpCkptName);
(void) unlink( TmpCkptName );
}
} else {
update_rusage( &Proc.local_usage, localp );
update_rusage( &Proc.remote_usage, remotep );
if( StoreProc(QueueD,&Proc) < 0 ) {
CLOSEJOBQUEUE( QueueD );
EXCEPT( "StoreProc(%d.%d)", Proc.id.cluster, Proc.id.proc );
}
dprintf( D_ALWAYS, "Shadow: marked job status %d\n", Proc.status );
if( Proc.status == COMPLETED ) {
if( TerminateProc(QueueD,&Proc.id,COMPLETED) < 0 ) {
EXCEPT( "TerminateProc(0x%x,%d.%d,COMPLETED)",
QueueD, Proc.id.cluster, Proc.id.proc );
}
}
}
CLOSEJOBQUEUE( QueueD );
}
update_rusage( ru1, ru2 )
register struct rusage *ru1, *ru2;
{
ru1->ru_utime.tv_usec += ru2->ru_utime.tv_usec;
if( ru1->ru_utime.tv_usec >= 1000000 ) {
ru1->ru_utime.tv_usec -= 1000000;
ru1->ru_utime.tv_sec += 1;
}
ru1->ru_utime.tv_sec += ru2->ru_utime.tv_sec;
ru1->ru_stime.tv_usec += ru2->ru_stime.tv_usec;
if( ru1->ru_stime.tv_usec >= 1000000 ) {
ru1->ru_stime.tv_usec -= 1000000;
ru1->ru_stime.tv_sec += 1;
}
ru1->ru_stime.tv_sec += ru2->ru_stime.tv_sec;
if( ru2->ru_maxrss > ru1->ru_maxrss ) {
ru1->ru_maxrss = ru2->ru_maxrss;
}
if( ru2->ru_ixrss > ru1->ru_ixrss ) {
ru1->ru_ixrss = ru2->ru_ixrss;
}
if( ru2->ru_idrss > ru1->ru_idrss ) {
ru1->ru_idrss = ru2->ru_idrss;
}
if( ru2->ru_isrss > ru1->ru_isrss ) {
ru1->ru_isrss = ru2->ru_isrss;
}
ru1->ru_minflt += ru2->ru_minflt;
ru1->ru_majflt += ru2->ru_majflt;
ru1->ru_nswap += ru2->ru_nswap;
ru1->ru_inblock += ru2->ru_inblock;
ru1->ru_oublock += ru2->ru_oublock;
ru1->ru_msgsnd += ru2->ru_msgsnd;
ru1->ru_msgrcv += ru2->ru_msgrcv;
ru1->ru_nsignals += ru2->ru_nsignals;
ru1->ru_nvcsw += ru2->ru_nvcsw;
ru1->ru_nivcsw += ru2->ru_nivcsw;
}
/*
** Connect to the scheduler on the remote host and ask to run a job. The
** connection to the starter will become our main connection to the
** remote starter, and the scheduler will send back an aux port number to
** be used for out of band (error) traffic.
*/
send_job( proc, host )
PROC *proc;
char *host;
{
int cmd = START_FRGN_JOB;
int reply;
bool_t xdr_ports();
PORTS ports;
int sock;
XDR xdr, *xdrs;
CONTEXT *context;
dprintf( D_FULLDEBUG, "Shadow: Entering send_job()\n" );
/* Connect to the startd */
if( (sock = do_connect(host, "condor_startd", START_PORT)) < 0 ) {
dprintf( D_ALWAYS, "Shadow: Can't connect to condor_startd on %s\n",
host);
DoCleanup();
dprintf( D_ALWAYS, "********** Shadow Parent Exiting **********\n" );
exit( 0 );
}
xdrs = xdr_Init( &sock, &xdr );
xdrs->x_op = XDR_ENCODE;
/* Send the command */
if( !xdr_int(xdrs, &cmd) ) {
EXCEPT( "xdr_int()" );
}
/* Send the job info */
context = build_context( proc );
if( !xdr_context(xdrs,context) ) {
EXCEPT( "xdr_context(0x%x,0x%x)", xdrs, context );
}
if( !xdrrec_endofrecord(xdrs,TRUE) ) {
EXCEPT( "xdrrec_endofrecord(TRUE)" );
}
free_context( context );
xdrs->x_op = XDR_DECODE;
ASSERT( xdr_int(xdrs,&reply) );
ASSERT( xdrrec_skiprecord(xdrs) );
if( reply != OK ) {
dprintf( D_ALWAYS, "Shadow: Request to run a job was REFUSED\n" );
DoCleanup();
dprintf( D_ALWAYS, "********** Shadow Parent Exiting **********\n" );
exit( 0 );
}
dprintf( D_ALWAYS, "Shadow: Request to run a job was ACCEPTED\n" );
ASSERT( xdr_ports(xdrs,&ports) );
/*
dprintf( D_ALWAYS, "Shadow: Port1 = %d, Port2 = %d\n",
ports.port1, ports.port2 );
*/
if( ports.port1 == 0 ) {
dprintf( D_ALWAYS, "Shadow: Request to run a job on %s was REFUSED\n",
host );
dprintf( D_ALWAYS, "********** Shadow Parent Exiting **********\n" );
exit( 0 );
}
/*
** close(sock);
*/
if( (sock = do_connect(host, (char *)0, (u_short)ports.port1)) < 0 ) {
EXCEPT( "connect to scheduler on \"%s\", port1 = %d",
host, ports.port1 );
}
if( sock != RSC_SOCK ) {
ASSERT(dup2(sock, RSC_SOCK) >= 0);
(void)close(sock);
}
/*
dprintf( D_ALWAYS, "Shadow: RSC_SOCK connected, fd = %d\n", RSC_SOCK );
*/
if( (sock = do_connect(host, (char *)0, (u_short)ports.port2)) < 0 ) {
EXCEPT( "connect to scheduler on \"%s\", port2 = %d",
host, ports.port2 );
}
if( sock != CLIENT_LOG ) {
ASSERT(dup2(sock, CLIENT_LOG) >= 0);
(void)close(sock);
}
/*
dprintf( D_ALWAYS, "Shadow: CLIENT_LOG connected, fd = %d\n", CLIENT_LOG );
*/
xdr_RSC = RSC_ShadowInit( RSC_SOCK, CLIENT_LOG );
send_job_file( xdr_RSC, proc );
}
CONTEXT *
build_context( proc )
PROC *proc;
{
char line[1024];
CONTEXT *answer;
answer = create_context();
(void)sprintf( line, "JOB_REQUIREMENTS = (%s) && (Disk >= %d)",
proc->requirements, calc_disk_needed() );
store_stmt( scan(line), answer );
if( proc->preferences && proc->preferences[0] ) {
(void)sprintf( line, "JOB_PREFERENCES = %s", proc->preferences );
} else {
(void)sprintf( line, "JOB_PREFERENCES = T" );
}
store_stmt( scan(line), answer );
(void)sprintf( line, "Owner = \"%s\"", proc->owner );
store_stmt( scan(line), answer );
return answer;
}
get_client_ids( proc )
register PROC *proc;
{
struct passwd *pwd;
(void)sprintf( CkptName, "%s/job%06d.ckpt.%d",
Spool, proc->id.cluster, proc->id.proc );
(void)sprintf( TmpCkptName, "%s.tmp", CkptName );
(void)sprintf( ICkptName, "%s/job%06d.ickpt",
Spool, proc->id.cluster );
if( strcmp(proc->rootdir, "/") == 0 ) {
(void)strcpy(RCkptName, CkptName); /* Return the checkpoint in place */
} else {
(void)sprintf( RCkptName, "/job%06d.ckpt.%d",
proc->id.cluster, proc->id.proc );
}
pwd = getpwnam("condor");
if( pwd == NULL ) {
EXCEPT("Can't find password entry for user 'condor'");
}
CondorGid = pwd->pw_gid;
CondorUid = pwd->pw_uid;
pwd = getpwnam(proc->owner);
if( pwd == NULL ) {
EXCEPT("Can't find password entry for '%s'", proc->owner);
}
ClientUid = pwd->pw_uid;
ClientGid = pwd->pw_gid;
}
send_job_file( xdrs, proc )
XDR *xdrs;
PROC *proc;
{
int in_fd;
int len;
struct stat st_buf;
char buf[ XDR_BLOCKSIZ ];
int ask, received;
char *return_name;
dprintf( D_ALWAYS, "Shadow: send_job_file( 0x%x, %d.%d, %s, %s, %s, %s )\n",
xdrs, proc->id.cluster, proc->id.proc,
proc->in, proc->out, proc->err, proc->args );
dprintf(D_FULLDEBUG, "Shadow: send_job_file: RootDir = <%s>\n",
proc->rootdir);
in_fd = open( CkptName, O_RDONLY, 0 );
if( in_fd < 0 ) {
in_fd = open( ICkptName, O_RDONLY, 0 );
}
if( in_fd < 0 ) {
EXCEPT( "open(\"%s\",O_RDONLY,0)", ICkptName );
}
if( fstat(in_fd,&st_buf) < 0 ) {
EXCEPT( "fstat" );
}
len = st_buf.st_size;
#ifdef DAYAO
StartRecording();
#endif DAYAO
xdrs->x_op = XDR_ENCODE;
dprintf( D_FULLDEBUG, "Shadow: Sending proc structure\n" );
ASSERT(xdr_proc(xdrs, proc));
return_name = RCkptName;
dprintf( D_FULLDEBUG, "Shadow: Sending name = '%s'\n", return_name );
ASSERT( xdr_string(xdrs, &return_name, (u_int) MAXPATHLEN) );
dprintf( D_FULLDEBUG, "Shadow: Sending len = %d\n", len );
ASSERT( xdr_int(xdrs, &len) );
while( len ) {
ask = len < XDR_BLOCKSIZ ? len : XDR_BLOCKSIZ;
if( (received=read(in_fd,buf,ask)) < 0 ) {
EXCEPT( "read" );
}
ASSERT( xdr_opaque(xdrs, buf, (u_int)received) );
len -= received;
}
ASSERT( xdrrec_endofrecord(xdrs, TRUE) );
#ifdef DAYAO
CompleteRecording( st_buf.st_size );
#endif DAYAO
dprintf( D_ALWAYS, "Shadow: Done sending job file\n" );
(void)close( in_fd );
}
extern char *SigNames[];
rm()
{
dprintf( D_ALWAYS, "Shadow: Got RM command *****\n" );
if( ChildPid ) {
(void) kill( ChildPid, SIGKILL );
}
if( strcmp(Proc.rootdir, "/") != 0 ) {
(void)sprintf( TmpCkptName, "%s/job%06d.ckpt.%d",
Proc.rootdir, Proc.id.cluster, Proc.id.proc );
}
(void) unlink( TmpCkptName );
(void) unlink( CkptName );
exit( 1 );
}
/*
** Print an identifier saying who we are. This function gets handed to
** dprintf().
*/
whoami( fp )
FILE *fp;
{
if( Proc.id.cluster || Proc.id.proc ) {
fprintf( fp, "(%d.%d) (%d):", Proc.id.cluster, Proc.id.proc, MyPid );
} else {
fprintf( fp, "(?.?) (%d):", MyPid );
}
}
/*
** Opens job queue (Q), and reads in process structure (Proc) as side
** affects.
*/
start_job( host, cluster_id, proc_id )
char *host;
char *cluster_id;
char *proc_id;
{
OPENJOBQUEUE(QueueD, QueueName, O_RDWR, 0);
LockJobQueue( QueueD, WRITER );
Proc.id.cluster = atoi( cluster_id );
Proc.id.proc = atoi( proc_id );
if( FetchProc(QueueD,&Proc) < 0 ) {
EXCEPT( "FetchProc(%d.%d)", Proc.id.cluster, Proc.id.proc );
}
if( Proc.status != RUNNING ) {
dprintf( D_ALWAYS, "Shadow: Asked to run proc %d.%d, but status = %d\n",
Proc.id.cluster, Proc.id.proc, Proc.status );
dprintf(D_ALWAYS, "********** Shadow Parent Exiting **********\n" );
exit( 0 ); /* don't cleanup here */
}
LocalUsage = Proc.local_usage;
RemoteUsage = Proc.remote_usage;
CLOSEJOBQUEUE( QueueD );
get_client_ids( &Proc );
send_job( &Proc, host );
}
DoCleanup()
{
char ckpt_name[MAXPATHLEN];
dprintf( D_FULLDEBUG, "Shadow: Entered DoCleanup()\n" );
if( TmpCkptName[0] != '\0' ) {
dprintf(D_ALWAYS, "Shadow: DoCleanup: unlinking TmpCkpt '%s'\n",
TmpCkptName);
(void) unlink( TmpCkptName );
}
if( Proc.id.cluster ) {
if( QueueD == NULL ) {
if( (QueueD=OpenJobQueue(QueueName, O_RDWR, 0)) == NULL ) {
dprintf( D_ALWAYS,"Shadow: OpenJobQUeue(%s) failed, errno %d\n",
QueueName, errno );
dprintf( D_ALWAYS,
"********** Shadow Parent Exiting **********\n" );
exit( 1 );
}
}
LockJobQueue( QueueD, WRITER );
if( FetchProc(QueueD,&Proc) < 0 ) {
dprintf(D_ALWAYS, "Job %d.%d has been removed by condor_rm\n",
Proc.id.cluster, Proc.id.proc );
dprintf(D_ALWAYS, "Shadow: unlinking Ckpt '%s'\n", ckpt_name);
(void) unlink( ckpt_name );
dprintf( D_ALWAYS,"********** Shadow Parent Exiting **********\n" );
exit( 1 );
}
(void)sprintf( ckpt_name, "%s/job%06d.ckpt.%d",
Spool, Proc.id.cluster, Proc.id.proc );
if( Proc.status == REMOVED ) {
dprintf(D_ALWAYS, "Job %d.%d has been removed by condor_rm\n",
Proc.id.cluster, Proc.id.proc );
dprintf(D_ALWAYS, "Shadow: unlinking Ckpt '%s'\n", ckpt_name);
(void) unlink( ckpt_name );
CLOSEJOBQUEUE( QueueD );
return;
}
if( access(ckpt_name,F_OK) != 0 ) {
Proc.status = UNEXPANDED;
} else {
Proc.status = IDLE;
}
if( StoreProc(QueueD,&Proc) < 0 ) {
dprintf( D_ALWAYS, "Shadow: StoreProc(%d.%d) failed, errno = %d",
Proc.id.cluster, Proc.id.proc, errno );
dprintf(D_ALWAYS, "********** Shadow Parent Exiting **********\n" );
exit( 1 );
}
CLOSEJOBQUEUE( QueueD );
dprintf( D_ALWAYS, "Shadow: marked job status %d\n", Proc.status );
}
}
MvTmpCkpt()
{
char buf[ BUFSIZ * 8 ];
register int tfd, rfd, rcnt, wcnt;
if( setegid(CondorGid) < 0 ) {
EXCEPT("Cannot setegid(%d)", CondorGid);
}
if( seteuid(ClientUid) < 0 ) {
EXCEPT("Cannot seteuid(%d)", ClientUid);
}
(void)sprintf( TmpCkptName, "%s/job%06d.ckpt.%d.tmp",
Spool, Proc.id.cluster, Proc.id.proc );
(void)sprintf( RCkptName, "%s/job%06d.ckpt.%d",
Proc.rootdir, Proc.id.cluster, Proc.id.proc );
rfd = open( RCkptName, O_RDONLY, 0 );
if( rfd < 0 ) {
EXCEPT(RCkptName);
}
tfd = open(TmpCkptName, O_WRONLY|O_CREAT|O_TRUNC, 0775);
if( tfd < 0 ) {
EXCEPT(TmpCkptName);
}
for(;;) {
rcnt = read( rfd, buf, sizeof(buf) );
if( rcnt < 0 ) {
EXCEPT("read <%s>", RCkptName);
}
wcnt = write( tfd, buf, rcnt );
if( wcnt != rcnt ) {
EXCEPT("wcnt %d != rcnt %d", wcnt, rcnt);
}
if( rcnt != sizeof(buf) ) {
break;
}
}
(void)unlink( RCkptName );
if( rename(TmpCkptName, CkptName) < 0 ) {
EXCEPT("rename(%s, %s)", TmpCkptName, CkptName);
}
(void)fchown( tfd, CondorUid, CondorGid );
(void)close( rfd );
(void)close( tfd );
}
SetSyscalls(){}
calc_disk_needed( )
{
struct stat buf;
if( stat(CkptName,&buf) == 0 ) {
return buf.st_size / 1024;
}
if( stat(ICkptName,&buf) == 0 ) {
return buf.st_size / 1024;
}
EXCEPT( "Can't stat checkpoint file \"%s\"", ICkptName );
#ifdef LINT
return -1;
#endif LINT
}
open_std_files( proc )
PROC *proc;
{
int fd;
dprintf( D_FULLDEBUG, "Entered open_std_files()\n" );
(void)close( 0 );
(void)close( 1 );
(void)close( 2 );
if( (fd=open(proc->in,O_RDONLY,0)) < 0 ) {
PERM_ERR( "Can't open \"%s\"", proc->in );
EXCEPT( "open(%s,O_RDONLY,0)", proc->in );
}
if( fd != 0 ) {
EXCEPT( "open returns %d, expected 0", fd );
}
if( (fd=open(proc->out,O_WRONLY,0)) < 0 ) {
PERM_ERR( "Can't open \"%s\"", proc->out );
EXCEPT( "open(%s,O_WRONLY,0)", proc->out );
}
if( fd != 1 ) {
dprintf( D_ALWAYS, "Can't open \"%s\"\n", proc->err );
EXCEPT( "open returns %d, expected 1", fd );
}
if( (fd=open(proc->err,O_WRONLY,0)) < 0 ) {
PERM_ERR( "Can't open \"%s\"", proc->err );
EXCEPT( "open(%s,O_WRONLY,0)", proc->err );
}
if( fd != 2 ) {
EXCEPT( "open returns %d, expected 2", fd );
}
dprintf( D_ALWAYS, "Opened \"%s\", \"%s\", and \"%s\"\n",
proc->in, proc->out, proc->err );
}