home *** CD-ROM | disk | FTP | other *** search
/ ftp.parl.clemson.edu / 2015-02-07.ftp.parl.clemson.edu.tar / ftp.parl.clemson.edu / pub / pvfs2 / orangefs-2.8.3-20110323.tar.gz / orangefs-2.8.3-20110323.tar / orangefs / src / io / job / job-time-mgr.c < prev    next >
C/C++ Source or Header  |  2006-09-18  |  8KB  |  316 lines

  1. /*
  2.  * (C) 2001 Clemson University and The University of Chicago
  3.  *
  4.  * See COPYING in top-level directory.
  5.  */
  6.  
  7. #include <sys/time.h>
  8. #include <assert.h>
  9. #include <string.h>
  10.  
  11. #include "pvfs2-types.h"
  12. #include "job-desc-queue.h"
  13. #include "job.h"
  14. #include "quicklist.h"
  15. #include "gen-locks.h"
  16. #include "gossip.h"
  17. #include "job-time-mgr.h"
  18. #include "pvfs2-internal.h"
  19.  
  20. static QLIST_HEAD(bucket_queue);
  21. static gen_mutex_t bucket_mutex = GEN_MUTEX_INITIALIZER;
  22.  
  23. struct time_bucket
  24. {
  25.     long expire_time_sec;
  26.     struct qlist_head bucket_link;
  27.     struct qlist_head jd_queue;
  28. };
  29.  
  30. /* job_time_mgr_init()
  31.  *
  32.  * initialize timeout mgmt interface
  33.  *
  34.  * returns 0 on success, -PVFS_error on failure
  35.  */
  36. int job_time_mgr_init(void)
  37. {
  38.     INIT_QLIST_HEAD(&bucket_queue);
  39.     return(0);
  40. }
  41.  
  42. /* job_time_mgr_finalize()
  43.  *
  44.  * shut down timeout mgmt interface
  45.  *
  46.  * returns 0 on success, -PVFS_error on failure
  47.  */
  48. int job_time_mgr_finalize(void)
  49. {
  50.     struct qlist_head* iterator = NULL;
  51.     struct qlist_head* scratch = NULL;
  52.     struct qlist_head* iterator2 = NULL;
  53.     struct qlist_head* scratch2 = NULL;
  54.     struct time_bucket* tmp_bucket = NULL;
  55.     struct job_desc* jd = NULL;
  56.  
  57.     gen_mutex_lock(&bucket_mutex);
  58.  
  59.     qlist_for_each_safe(iterator, scratch, &bucket_queue)
  60.     {
  61.     tmp_bucket = qlist_entry(iterator, struct time_bucket, bucket_link);
  62.         assert(tmp_bucket);
  63.  
  64.     qlist_del(&tmp_bucket->bucket_link);
  65.     qlist_for_each_safe(iterator2, scratch2, &tmp_bucket->jd_queue)
  66.     {
  67.         jd = qlist_entry(iterator2, struct job_desc, job_time_link);
  68.         qlist_del(&jd->job_time_link);
  69.         jd->time_bucket = NULL;
  70.     }
  71.     INIT_QLIST_HEAD(&tmp_bucket->jd_queue);
  72.     free(tmp_bucket);
  73.     }
  74.     INIT_QLIST_HEAD(&bucket_queue);
  75.  
  76.     gen_mutex_unlock(&bucket_mutex);
  77.  
  78.     return(0);
  79. }
  80.  
  81. static int __job_time_mgr_add(struct job_desc* jd, int timeout_sec)
  82. {
  83.     struct timeval tv;
  84.     long expire_time_sec;
  85.     struct qlist_head* tmp_link;
  86.     struct time_bucket* tmp_bucket = NULL;
  87.     struct time_bucket* prev_bucket = NULL;
  88.     struct time_bucket* new_bucket = NULL;
  89.  
  90.     struct qlist_head* prev;
  91.     struct qlist_head* next;
  92.  
  93.     if(timeout_sec == JOB_TIMEOUT_INF)
  94.     {
  95.     /* do nothing */
  96.     return(0);
  97.     }
  98.  
  99.     gettimeofday(&tv, NULL);
  100.  
  101.     /* round up to the second that this job should expire */
  102.     expire_time_sec = tv.tv_sec + timeout_sec;
  103.  
  104.     if(jd->type == JOB_FLOW)
  105.     {
  106.     jd->u.flow.timeout_sec = timeout_sec;
  107.     }
  108.  
  109.     /* look for a bucket matching the desired seconds value */
  110.     qlist_for_each(tmp_link, &bucket_queue)
  111.     {
  112.         tmp_bucket = qlist_entry(
  113.             tmp_link, struct time_bucket, bucket_link);
  114.         assert(tmp_bucket);
  115.  
  116.         if(tmp_bucket->expire_time_sec >= expire_time_sec)
  117.         {
  118.             break;
  119.         }
  120.  
  121.         prev_bucket = tmp_bucket;
  122.         tmp_bucket = NULL;
  123.     }
  124.  
  125.     if(!tmp_bucket || tmp_bucket->expire_time_sec != expire_time_sec)
  126.     {
  127.     /* make a new bucket, we didn't find an exact match */
  128.     new_bucket = (struct time_bucket*)
  129.             malloc(sizeof(struct time_bucket));
  130.     assert(new_bucket);
  131.  
  132.     new_bucket->expire_time_sec = expire_time_sec;
  133.     INIT_QLIST_HEAD(&new_bucket->bucket_link);
  134.     INIT_QLIST_HEAD(&new_bucket->jd_queue);
  135.  
  136.     if(tmp_bucket)
  137.         next = &tmp_bucket->bucket_link;
  138.     else
  139.         next = &bucket_queue;
  140.     
  141.     if(prev_bucket)
  142.         prev = &prev_bucket->bucket_link;
  143.     else
  144.         prev = &bucket_queue;
  145.  
  146.     __qlist_add(&new_bucket->bucket_link, prev, next);
  147.  
  148.     tmp_bucket = new_bucket;
  149.     }
  150.  
  151.     assert(tmp_bucket);
  152.     assert(tmp_bucket->expire_time_sec >= expire_time_sec);
  153.  
  154.     /* add the job descriptor onto the correct bucket */
  155.     qlist_add_tail(&jd->job_time_link, &tmp_bucket->jd_queue);
  156.     jd->time_bucket = tmp_bucket;
  157.  
  158.     return(0);
  159. }
  160. /* job_time_mgr_add()
  161.  *
  162.  * adds a job to be monitored for timeout, timeout_sec is an interval in
  163.  * seconds
  164.  *
  165.  * return 0 on success, -PVFS_error on failure
  166.  */
  167. int job_time_mgr_add(struct job_desc* jd, int timeout_sec)
  168. {
  169.     int ret = -1;
  170.  
  171.     gen_mutex_lock(&bucket_mutex);
  172.  
  173.     ret = __job_time_mgr_add(jd, timeout_sec);
  174.  
  175.     gen_mutex_unlock(&bucket_mutex);
  176.  
  177.     return(ret);
  178. }
  179.  
  180. /* job_time_mgr_rem()
  181.  *
  182.  * remove a job from the set that is being monitored for timeout
  183.  *
  184.  * no return value
  185.  */
  186. void job_time_mgr_rem(struct job_desc* jd)
  187. {
  188.     struct time_bucket* tmp_bucket = NULL;
  189.  
  190.     gen_mutex_lock(&bucket_mutex);
  191.  
  192.     if(jd->time_bucket == NULL)
  193.     {
  194.     /* nothing to do, it is already removed */
  195.         gen_mutex_unlock(&bucket_mutex);
  196.     return;
  197.     }
  198.  
  199.     tmp_bucket = (struct time_bucket*)jd->time_bucket;
  200.  
  201.     if(qlist_empty(&tmp_bucket->jd_queue))
  202.     {
  203.     /* no need for this bucket any longer; it is empty */
  204.     qlist_del(&tmp_bucket->bucket_link);
  205.     INIT_QLIST_HEAD(&tmp_bucket->jd_queue);
  206.     free(tmp_bucket);
  207.     }
  208.     qlist_del(&jd->job_time_link);
  209.     jd->time_bucket = NULL;
  210.  
  211.     gen_mutex_unlock(&bucket_mutex);
  212.  
  213.     return;
  214. }
  215.  
  216. /* job_time_mgr_expire()
  217.  *
  218.  * look for expired jobs and cancel them
  219.  *
  220.  * returns 0 on success, -PVFS_error on failure
  221.  */
  222. int job_time_mgr_expire(void)
  223. {
  224.     struct timeval tv;
  225.     struct qlist_head* iterator = NULL;
  226.     struct qlist_head* scratch = NULL;
  227.     struct qlist_head* iterator2 = NULL;
  228.     struct qlist_head* scratch2 = NULL;
  229.     struct time_bucket* tmp_bucket = NULL;
  230.     struct job_desc* jd = NULL;
  231.     int ret = -1;
  232.     PVFS_size tmp_size = 0;
  233.  
  234.     gettimeofday(&tv, NULL);
  235.  
  236.     gen_mutex_lock(&bucket_mutex);
  237.  
  238.     qlist_for_each_safe(iterator, scratch, &bucket_queue)
  239.     {
  240.     tmp_bucket = qlist_entry(iterator, struct time_bucket, bucket_link);
  241.         assert(tmp_bucket);
  242.  
  243.     /* stop when we see the first bucket that has not expired */
  244.     if(tmp_bucket->expire_time_sec > tv.tv_sec)
  245.     {
  246.         break;
  247.     }
  248.  
  249.     /* cancel the associated jobs and remove the bucket */
  250.     qlist_for_each_safe(iterator2, scratch2, &tmp_bucket->jd_queue)
  251.     {
  252.         jd = qlist_entry(iterator2, struct job_desc, job_time_link);
  253.         qlist_del(&jd->job_time_link);
  254.  
  255.         switch(jd->type)
  256.         {
  257.         case JOB_BMI:
  258.         gossip_err("%s: job time out: cancelling bmi operation, job_id: %llu.\n", __func__, llu(jd->job_id));
  259.         ret = job_bmi_cancel(jd->job_id, jd->context_id);
  260.             jd->time_bucket = NULL;
  261.         break;
  262.         case JOB_FLOW:
  263.         /* have we made any progress since last time we checked? */
  264.         PINT_flow_getinfo(jd->u.flow.flow_d,
  265.                                   FLOW_AMT_COMPLETE_QUERY, &tmp_size);
  266.         if(tmp_size > jd->u.flow.last_amt_complete)
  267.         {
  268.             /* if so, then update progress and reset timer */
  269.             jd->u.flow.last_amt_complete = tmp_size;
  270.             __job_time_mgr_add(jd, jd->u.flow.timeout_sec);
  271.             ret = 0;
  272.         }
  273.         else
  274.         {
  275.             /* otherwise kill the flow */
  276.             gossip_err("%s: job time out: cancelling flow operation, job_id: %llu.\n", __func__, llu(jd->job_id));
  277.             ret = job_flow_cancel(jd->job_id, jd->context_id);
  278.                 jd->time_bucket = NULL;
  279.         }
  280.         break;
  281.         case JOB_TROVE:
  282.         gossip_err("%s: job time out: cancelling trove operation, job_id: %llu.\n", __func__, llu(jd->job_id));
  283.         ret = job_trove_dspace_cancel(
  284.                     jd->u.trove.fsid, jd->job_id, jd->context_id);
  285.             jd->time_bucket = NULL;
  286.                 break;
  287.         default:
  288.         ret = 0;
  289.             jd->time_bucket = NULL;
  290.         break;
  291.         }
  292.  
  293.         /* FIXME: error handling */
  294.         assert(ret == 0);
  295.     }
  296.     qlist_del(&tmp_bucket->bucket_link);
  297.         INIT_QLIST_HEAD(&tmp_bucket->jd_queue);
  298.     free(tmp_bucket);
  299.     }
  300.  
  301.     gen_mutex_unlock(&bucket_mutex);
  302.  
  303.     return(0);
  304. }
  305.  
  306.  
  307. /*
  308.  * Local variables:
  309.  *  c-indent-level: 4
  310.  *  c-basic-offset: 4
  311.  * End:
  312.  *
  313.  * vim: ts=8 sts=4 sw=4 expandtab
  314.  */
  315.  
  316.