home *** CD-ROM | disk | FTP | other *** search
/ The World of Computer Software / World_Of_Computer_Software-02-385-Vol-1of3.iso / c / condor40.zip / CONDOR / src / util_lib / job_queue.c < prev    next >
C/C++ Source or Header  |  1989-05-15  |  13KB  |  602 lines

  1. /* 
  2. ** Copyright 1986, 1987, 1988, 1989 University of Wisconsin
  3. ** 
  4. ** Permission to use, copy, modify, and distribute this software and its
  5. ** documentation for any purpose and without fee is hereby granted,
  6. ** provided that the above copyright notice appear in all copies and that
  7. ** both that copyright notice and this permission notice appear in
  8. ** supporting documentation, and that the name of the University of
  9. ** Wisconsin not be used in advertising or publicity pertaining to
  10. ** distribution of the software without specific, written prior
  11. ** permission.  The University of Wisconsin makes no representations about
  12. ** the suitability of this software for any purpose.  It is provided "as
  13. ** is" without express or implied warranty.
  14. ** 
  15. ** THE UNIVERSITY OF WISCONSIN DISCLAIMS ALL WARRANTIES WITH REGARD TO
  16. ** THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
  17. ** FITNESS. IN NO EVENT SHALL THE UNIVERSITY OF WISCONSIN  BE LIABLE FOR
  18. ** ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  19. ** WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  20. ** ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  21. ** OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  22. ** 
  23. ** Authors:  Allan Bricker and Michael J. Litzkow,
  24. **              University of Wisconsin, Computer Sciences Dept.
  25. ** 
  26. */ 
  27.  
  28.  
  29. /**********************************************************************
  30. **    These routines provide access to the CONDOR job queue.  The queue consists
  31. **    of clusters of related processes.  All processes in a cluster share the
  32. **    same executable, (and thus the same initial checkpoint), and are submitted
  33. **    as a group via a single command file.  Individual processes are identified
  34. **    by a cluster_id/proc_id tuple.  Cluster id's are unique for all time,
  35. **    and process id's are 0 - n where there are n processes in the cluster.
  36. **    Individual processes are not removed from a cluster, they are just marked
  37. **    as "completed", "deleted", "running", "idle", etc.  Clusters are removed
  38. **    when all processes in the cluster are completed, but the cluster id's are
  39. **    not re-used.
  40. **
  41. **    Operations:
  42. **
  43. **        Q = OpenJobQueue( pathname, flags, mode )
  44. **            char    *pathname;
  45. **            DBM        *Q;
  46. **
  47. **        CloseJobQueue( Q )
  48. **            DBM        *Q;
  49. **
  50. **        LockJobQueue( Q, operation )
  51. **            DBM        *Q;
  52. **            int        operation;
  53. **
  54. **        ClusterId = CreateCluster( Q ) 
  55. **            DBM        *Q;
  56. **            int        ClusterId;
  57. **
  58. **        StoreProc( Q, Proc )
  59. **            DBM        *Q;
  60. **            PROC    *Proc;
  61. **
  62. **        Proc = FetchProc( Q, id )
  63. **            DBM        *Q;
  64. **            PROC_ID    *id;
  65. **            PROC    *Proc;
  66. **
  67. **        ScanJobQueue( Q, func )
  68. **            DBM        *Q;
  69. **            int        (*func)();
  70. **
  71. **        ScanCluster( Q, cluster, func )
  72. **            DBM        *Q;
  73. **            int        cluster;
  74. **            int        (*func)();
  75. **
  76. **        TerminateCluster( Q, cluster, status )
  77. **            DBM        *Q;
  78. **            int        cluster;
  79. **            int        status;
  80. **        
  81. **        TerminateProc( Q, pid, status )
  82. **            DBM        *Q;
  83. **            PROC_ID    *pid;
  84. **            int        status;
  85. **        
  86. **    The low level database operations are accomplished by ndbm(3) using
  87. **    XDR to pack/unpack the Proc structs to/from contiguous areas of memory.
  88. **    The locking is accomplished by flock(3).
  89. **************************************************************************/
  90.  
  91. #include <stdio.h>
  92. #include <errno.h>
  93. #include <sys/file.h>
  94. #include <rpc/types.h>
  95. #include <rpc/xdr.h>
  96. #include <sys/param.h>
  97. #include <sys/time.h>
  98. #include <sys/resource.h>
  99. #include "proc.h"
  100. #include "debug.h"
  101. #include "except.h"
  102. #include "trace.h"
  103. #include "clib.h"
  104.  
  105. #ifdef NDBM
  106. #include <ndbm.h>
  107. #else NDBM
  108. #include "ndbm_fake.h"
  109. #endif NDBM
  110.  
  111. static char *_FileName_ = __FILE__;        /* Used by EXCEPT (see except.h)     */
  112. char    *Spool;
  113.  
  114. char    *malloc();
  115.  
  116. #define LIST_INCR    5
  117.  
  118. static PROC_ID ClusterId = { 0, 0 };
  119. static datum    ClusterKey = { (char *)&ClusterId, sizeof(ClusterId) };
  120.  
  121. /*
  122. ** Prepare the local job queue for reading/writing with these routines.
  123. */
  124. DBM    *
  125. OpenJobQueue( path, flags, mode )
  126. char    *path;
  127. {
  128.     return dbm_open(path,flags,mode);
  129. }
  130.  
  131. /*
  132. ** Close the job queue.
  133. */
  134. CloseJobQueue( Q )
  135. DBM        *Q;
  136. {
  137.     dbm_close( Q );
  138. }
  139.  
  140. /*
  141. ** Put a reader's/writer's lock on an open job queue.
  142. */
  143. LockJobQueue( Q, op )
  144. DBM        *Q;
  145. int        op;
  146. {
  147.     while( flock(Q->dbm_pagf,op) < 0 ) {
  148.         if( errno != EINTR ) {
  149.             EXCEPT("flock(%d,0%o) -- Q = 0x%x", Q->dbm_pagf, op, Q );
  150.         }
  151.     }
  152. }
  153.  
  154. /*
  155. ** Allocate a new cluster id, and store it in the list of active
  156. ** cluster id's.
  157. */
  158. CreateCluster( Q )
  159. DBM        *Q;
  160. {
  161.     CLUSTER_LIST    *list, *fetch_cluster_list(), *add_new_cluster();
  162.     int                answer;
  163.  
  164.     list = fetch_cluster_list( Q );
  165.  
  166.     list = add_new_cluster( list );
  167.     answer = list->next_id - 1;
  168.  
  169.     store_cluster_list( Q, list );
  170.  
  171.     return answer;
  172. }
  173.  
  174. /*
  175. ** Store a PROC in the database.
  176. */
  177. StoreProc( Q, proc )
  178. DBM        *Q;
  179. PROC    *proc;
  180. {
  181.     char    buf[10 * MAXPATHLEN];
  182.     XDR        xdr, *xdrs = &xdr;
  183.     datum    key;
  184.     datum    data;
  185.  
  186.         /* Use XDR to pack the structure into a contiguous buffer */
  187.     xdrmem_create( xdrs, buf, sizeof(buf), XDR_ENCODE );
  188.     ASSERT( xdr_proc(xdrs,proc) );
  189.  
  190.         /* Store it with dbm */
  191.     key.dptr = (char *)&proc->id;
  192.     key.dsize = sizeof( PROC_ID );
  193.     data.dptr = buf;
  194.     data.dsize = xdrs->x_private - xdrs->x_base;
  195.     return dbm_store(Q,key,data,DBM_REPLACE);
  196. }
  197.  
  198. /*
  199. ** We are given a process structure with the id filled in.  We look up
  200. ** the id in the database, and fill in the rest of the structure.
  201. */
  202. FetchProc( Q, proc )
  203. DBM        *Q;
  204. PROC    *proc;
  205. {
  206.     XDR        xdr, *xdrs = &xdr;
  207.     datum    key;
  208.     datum    data;
  209.  
  210.         /* Fetch the database entry */
  211.     key.dptr = (char *)&proc->id;
  212.     key.dsize = sizeof( PROC_ID );
  213.     data = dbm_fetch( Q, key );
  214.     if( !data.dptr ) {
  215.         return -1;
  216.     }
  217.  
  218.         /* Use XDR to unpack the structure from the contiguous buffer */
  219.     xdrmem_create( xdrs, data.dptr, data.dsize, XDR_DECODE );
  220.     bzero( (char *)proc, sizeof(PROC) );    /* let XDR allocate all the space */
  221.     if( !xdr_proc(xdrs,proc) ) {
  222.         EXCEPT( "Can't unpack proc %d.%d", proc->id.cluster, proc->id.proc );
  223.     }
  224.     return 0;
  225. }
  226.  
  227. /*
  228. ** Apply the given function to every process in the queue.
  229. */
  230. ScanJobQueue( Q, func )
  231. DBM        *Q;
  232. int        (*func)();
  233. {
  234.     int                i;
  235.     CLUSTER_LIST    *list;
  236.  
  237.     list = fetch_cluster_list( Q );
  238.  
  239.     for( i=0; i<list->n_ids; i++ ) {
  240.         ScanCluster( Q, list->id[i], func );
  241.     }
  242.  
  243.     free( (char *)list );
  244. }
  245.  
  246. /*
  247. ** Apply the function to every process in the specified cluster.
  248. */
  249. ScanCluster( Q, cluster, func )
  250. DBM        *Q;
  251. int        cluster;
  252. int        (*func)();
  253. {
  254.     PROC    proc;
  255.     int        proc_id;
  256.  
  257.     proc.id.cluster = cluster;
  258.     for( proc_id = 0; ;proc_id++) {
  259.         proc.id.proc = proc_id;
  260.         if( FetchProc(Q,&proc) < 0 ) {
  261.             return;
  262.         }
  263.         func( &proc );
  264.         xdr_free_proc( &proc );
  265.     }
  266. }
  267.  
  268. static DBM    *CurJobQ;
  269. static int    CurStatus;
  270. static XDR    *CurHistory;
  271. /*
  272. ** Terminate every process in the cluster with the given status.  This
  273. ** includes storing the proc structs in the history file and removing
  274. ** the cluster from the active cluster list.
  275. */
  276. TerminateCluster( Q, cluster, status )
  277. DBM        *Q;
  278. int        cluster;
  279. int        status;
  280. {
  281.     int        terminate();
  282.     XDR        xdr, *OpenHistory();
  283.     int        fd;
  284.     char    ickpt_name[MAXPATHLEN];
  285.  
  286.     CurJobQ = Q;
  287.     CurStatus = status;
  288.     CurHistory = OpenHistory( param("HISTORY"), &xdr, &fd );
  289.     (void)LockHistory( CurHistory, WRITER );
  290.  
  291.     ScanCluster( Q, cluster, terminate );
  292.  
  293.     CloseHistory( CurHistory );
  294.     delete_cluster( Q, cluster );
  295.  
  296.     (void)sprintf( ickpt_name, "%s/job%06d.ickpt", Spool, cluster );
  297.     (void)unlink( ickpt_name );
  298.  
  299.     CurJobQ = NULL;
  300. }
  301.  
  302. static int    EmptyCluster;
  303. /*
  304. ** Terminate a particular process.  We also check to see if this is the last
  305. ** active process in the cluster, and if so, we terminate the cluster.
  306. */
  307. TerminateProc( Q, pid, status )
  308. DBM        *Q;
  309. PROC_ID    *pid;
  310. int        status;
  311. {
  312.     PROC    proc;
  313.     int        check_empty_cluster();
  314.     char    ckpt_name[MAXPATHLEN];
  315.  
  316.     proc.id.cluster = pid->cluster;
  317.     proc.id.proc = pid->proc;
  318.     if( FetchProc(Q,&proc) != 0 ) {
  319.         printf( "Process %d.%d doesn't exist\n",
  320.                                             proc.id.cluster, proc.id.proc );
  321.         return -1;
  322.     }
  323.  
  324.     proc.status = status;
  325.     if( StoreProc(Q,&proc) != 0 ) {
  326.         EXCEPT( "Can't store process struct %d.%d\n",
  327.                                             proc.id.cluster, proc.id.proc );
  328.     }
  329.  
  330.     (void)sprintf( ckpt_name, "%s/job%06d.ckpt.%d",
  331.                                     Spool, proc.id.cluster, proc.id.proc );
  332.     (void)unlink( ckpt_name );
  333.     xdr_free_proc( &proc );
  334.  
  335.     EmptyCluster = TRUE;
  336.     ScanCluster( Q, pid->cluster, check_empty_cluster );
  337.  
  338.     if( EmptyCluster ) {
  339.         TerminateCluster( Q, pid->cluster, -1 );
  340.     }
  341.     return 0;
  342. }
  343.  
  344. /*
  345. ** Store the given cluster list in the database.  The list we are given
  346. ** is always malloc'd, so we free it here.  ROUTINES WHICH CALL
  347. ** THIS ONE MUST NOT MAKE ANY USE OF THE LIST AFTERWARD!
  348. */
  349. static
  350. store_cluster_list( Q, list )
  351. DBM                *Q;
  352. CLUSTER_LIST    *list;
  353. {
  354.     datum    data;
  355.  
  356.     data.dptr = (char *)list;
  357.     data.dsize = sizeof(CLUSTER_LIST) + (list->array_len - 1) * sizeof(int);
  358.  
  359.     if( dbm_store(Q,ClusterKey,data,DBM_REPLACE) < 0 ) {
  360.         EXCEPT( "dbm_store" );
  361.     }
  362.     free( (char *)list );
  363. }
  364.  
  365. /*
  366. ** Fetch a cluster list from the database.  If none is there, create a
  367. ** new and empty one.  In either case the list we give back is always
  368. ** malloc'd, and should be free'd later.
  369. */
  370. CLUSTER_LIST *
  371. fetch_cluster_list( Q )
  372. DBM        *Q;
  373. {
  374.     datum    data;
  375.     CLUSTER_LIST    *answer, *make_cluster_list(), *copy_cluster_list();
  376.  
  377.     data = dbm_fetch( Q, ClusterKey );
  378.     if( data.dptr ) {
  379.         answer = copy_cluster_list( (CLUSTER_LIST *)data.dptr );
  380.     } else {
  381.         answer = make_cluster_list( LIST_INCR );
  382.     }
  383.  
  384.     return answer;
  385. }
  386.  
  387. /*
  388. ** Increase the size of a cluster list by (incr).  The list we are given
  389. ** is always malloc'd, so we can use realloc for this.
  390. */
  391. static
  392. CLUSTER_LIST *
  393. grow_cluster_list( list, incr )
  394. CLUSTER_LIST    *list;
  395. int                incr;
  396. {
  397.     list->array_len += incr;
  398.  
  399.     return (CLUSTER_LIST *)realloc( (char *)list,
  400.         (unsigned)(sizeof(CLUSTER_LIST) + (list->array_len-1) * sizeof(int)) );
  401. }
  402.  
  403. /*
  404. ** Return a new and empty cluster list in a malloc'd area.
  405. */
  406. static
  407. CLUSTER_LIST    *
  408. make_cluster_list( len )
  409. int        len;
  410. {
  411.     CLUSTER_LIST    *answer;
  412.  
  413.     answer = (CLUSTER_LIST *)malloc( (unsigned)(sizeof(CLUSTER_LIST) +
  414.                                             (len - 1) * sizeof(int)) );
  415.     if( answer == NULL ) {
  416.         return NULL;
  417.     }
  418.     answer->n_ids = 0;
  419.     answer->array_len = len;
  420.     answer->next_id = 1;
  421.     return answer;
  422. }
  423.  
  424. static
  425. CLUSTER_LIST *
  426. add_new_cluster( list )
  427. CLUSTER_LIST    *list;
  428. {
  429.     CLUSTER_LIST    *grow_cluster_list();
  430.  
  431.     if( list->n_ids == list->array_len ) {
  432.         list = grow_cluster_list( list, LIST_INCR );
  433.     }
  434.     list->id[ list->n_ids++ ] = list->next_id++;
  435.     return list;
  436. }
  437.  
  438. /*
  439. ** When we fetch a cluster list from the database, we get a pointer into
  440. ** the database's private buffer.  Here we copy it into our own malloc'd
  441. ** space so we don't have to worry about the database overwriting it
  442. ** later.
  443. */
  444. static
  445. CLUSTER_LIST *
  446. copy_cluster_list( list )
  447. CLUSTER_LIST    *list;
  448. {
  449.     CLUSTER_LIST    *answer;
  450.  
  451.         /* Malloc space for the new one */
  452.     answer = (CLUSTER_LIST *)malloc( (unsigned)(sizeof(CLUSTER_LIST) +
  453.                                 (list->array_len - 1) * sizeof(int)) );
  454.  
  455.         /* Copy in the old one */
  456.     bcopy( (char *)list, (char *)answer,
  457.         (unsigned)(sizeof(CLUSTER_LIST) + (list->array_len-1) * sizeof(int)) );
  458.     
  459.     return answer;
  460. }
  461.  
  462.  
  463. static
  464. terminate( p )
  465. PROC    *p;
  466. {
  467.     datum    key;
  468.  
  469.     if( CurStatus >= 0 ) {
  470.         p->status = CurStatus;
  471.     }
  472.     if( p->status != SUBMISSION_ERR ) {
  473.         AppendHistory( CurHistory, p );
  474.     }
  475.     key.dptr = (char *)&p->id;
  476.     key.dsize = sizeof( PROC_ID );
  477.     if( dbm_delete(CurJobQ,key) != 0 ) {
  478.         fprintf( stderr, "Can't delete process %d.%d\n",
  479.                                             p->id.cluster, p->id.proc );
  480.     }
  481. }
  482.  
  483. static
  484. check_empty_cluster( p )
  485. PROC    *p;
  486. {
  487.     switch( p->status ) {
  488.         case UNEXPANDED:
  489.         case IDLE:
  490.         case RUNNING:
  491.             EmptyCluster = FALSE;
  492.             break;
  493.         case REMOVED:
  494.         case COMPLETED:
  495.         default:
  496.             break;
  497.     }
  498. }
  499.  
  500. /*
  501. ** Delete a cluster from the cluster list.  This DOES NOT delete individual
  502. ** processes within the structure.  That should be done by a higher level
  503. ** routine which also calls this one.
  504. */
  505. static
  506. delete_cluster( Q, cluster )
  507. DBM        *Q;
  508. int        cluster;
  509. {
  510.     CLUSTER_LIST    *list, *fetch_cluster_list();
  511.     int                src, dst;
  512.  
  513.     list = fetch_cluster_list( Q );
  514.  
  515.     for( src = 0, dst = 0; src < list->n_ids; ) {
  516.         if( list->id[src] != cluster ) {
  517.             list->id[dst++] = list->id[src++];
  518.         } else {
  519.             src++;
  520.         }
  521.     }
  522.     list->n_ids = dst;
  523.  
  524.     store_cluster_list( Q, list );
  525. }
  526.  
  527.  
  528. /*
  529. ** The current version of ULTRIX doesn't have ndbm.  We'll build a subset
  530. ** of that interface here with some loss of functionality.  We won't be
  531. ** able to handle multiple open databases, so we'll sacrifice the history
  532. ** keeping function for ULTRIX installations.  If ULTRIX ever comes
  533. ** up with an ndbm type of database library, this can all go away.
  534. */
  535.  
  536. #ifndef NDBM
  537.  
  538.  
  539. DBM *
  540. dbm_open( file, flags, mode )
  541. char    *file;
  542. int        flags;
  543. int        mode;
  544. {
  545.     static DBM dbm;
  546.     char    name[MAXPATHLEN];
  547.  
  548.     (void)sprintf( name, "%s.dir", file );
  549.     if( (dbm.dbm_dirf = open(name,flags,mode)) < 0 ) {
  550.         return NULL;
  551.     }
  552.  
  553.     (void)sprintf( name, "%s.pag", file );
  554.     if( (dbm.dbm_pagf = open(name,flags,mode)) < 0 ) {
  555.         (void)close(dbm.dbm_dirf);
  556.         return NULL;
  557.     }
  558.  
  559.     if( dbminit(file) < 0 ) {
  560.         return NULL;
  561.     }
  562.  
  563.     return &dbm;
  564. }
  565.  
  566. dbm_close( Q )
  567. DBM    *Q;
  568. {
  569.     (void)close( Q->dbm_pagf );
  570.     (void)close( Q->dbm_dirf );
  571.     dbmclose();
  572. }
  573.  
  574. /* ARGSUSED */
  575. dbm_store(Q,key,data,flags)
  576. DBM        *Q;
  577. datum    key;
  578. datum    data;
  579. int        flags;
  580. {
  581.     return store( key, data );
  582. }
  583.  
  584. /* ARGSUSED */
  585. datum
  586. dbm_fetch( Q, key )
  587. DBM        *Q;
  588. datum    key;
  589. {
  590.     return fetch( key );
  591. }
  592.  
  593. /* ARGSUSED */
  594. dbm_delete( Q, key )
  595. DBM        *Q;
  596. datum    key;
  597. {
  598.     return delete( key );
  599. }
  600.  
  601. #endif NOT NDBM
  602.