home *** CD-ROM | disk | FTP | other *** search
/ ftp.parl.clemson.edu / 2015-02-07.ftp.parl.clemson.edu.tar / ftp.parl.clemson.edu / pub / pvfs2 / orangefs-2.8.3-20110323.tar.gz / orangefs-2.8.3-20110323.tar / orangefs / src / io / job / thread-mgr.c < prev    next >
C/C++ Source or Header  |  2010-12-21  |  22KB  |  873 lines

  1. /*
  2.  * (C) 2001 Clemson University and The University of Chicago
  3.  *
  4.  * See COPYING in top-level directory.
  5.  */
  6.  
  7. #include <stdlib.h>
  8. #include <string.h>
  9. #include <assert.h>
  10.  
  11. #include "pvfs2-types.h"
  12. #include "thread-mgr.h"
  13. #include "gen-locks.h"
  14. #include "gossip.h"
  15. #include "bmi.h"
  16. #include "trove.h"
  17. #include "pvfs2-internal.h"
  18.  
  19. #include "pint-event.h"
  20. #include <stdio.h>
  21.  
  22. #define THREAD_MGR_TEST_COUNT 5
  23. #define THREAD_MGR_TEST_TIMEOUT 10
  24. static int thread_mgr_test_timeout = THREAD_MGR_TEST_TIMEOUT;
  25.  
  26. /* TODO: organize this stuff better */
  27. static void *bmi_thread_function(void *ptr);
  28. static void *trove_thread_function(void *ptr);
  29. static void *dev_thread_function(void *ptr);
  30. static struct BMI_unexpected_info stat_bmi_unexp_array[THREAD_MGR_TEST_COUNT];
  31. static bmi_op_id_t stat_bmi_id_array[THREAD_MGR_TEST_COUNT];
  32. static bmi_error_code_t stat_bmi_error_code_array[THREAD_MGR_TEST_COUNT];
  33. static bmi_size_t stat_bmi_actual_size_array[THREAD_MGR_TEST_COUNT];
  34. static void *stat_bmi_user_ptr_array[THREAD_MGR_TEST_COUNT];
  35. static TROVE_op_id stat_trove_id_array[THREAD_MGR_TEST_COUNT];
  36. static void *stat_trove_user_ptr_array[THREAD_MGR_TEST_COUNT];
  37. static TROVE_ds_state stat_trove_error_code_array[THREAD_MGR_TEST_COUNT];
  38. static gen_mutex_t bmi_mutex = GEN_MUTEX_INITIALIZER;
  39. static gen_mutex_t trove_mutex = GEN_MUTEX_INITIALIZER;
  40. static gen_mutex_t dev_mutex = GEN_MUTEX_INITIALIZER;
  41. static int bmi_unexp_count = 0;
  42. static int dev_unexp_count = 0;
  43. static void (*bmi_unexp_fn)(struct BMI_unexpected_info* unexp);
  44. static void (*dev_unexp_fn)(struct PINT_dev_unexp_info* unexp);
  45. static bmi_context_id global_bmi_context = -1;
  46. static TROVE_context_id global_trove_context = -1;
  47. static int bmi_thread_ref_count = 0;
  48. static int trove_thread_ref_count = 0;
  49. static int dev_thread_ref_count = 0;
  50. static PVFS_fs_id HACK_fs_id = 9; /* TODO: fix later */
  51. static struct PINT_dev_unexp_info stat_dev_unexp_array[THREAD_MGR_TEST_COUNT];
  52. #ifdef __PVFS2_JOB_THREADED__
  53. static pthread_t bmi_thread_id;
  54. static pthread_t trove_thread_id;
  55. static pthread_t dev_thread_id;
  56.  
  57. static pthread_cond_t bmi_test_cond = PTHREAD_COND_INITIALIZER;
  58. static pthread_cond_t trove_test_cond = PTHREAD_COND_INITIALIZER;
  59. static pthread_cond_t dev_unexp_test_cond = PTHREAD_COND_INITIALIZER;
  60. #endif /* __PVFS2_JOB_THREADED__ */
  61.  
  62. /* used to indicate that a bmi testcontext is in progress; we can't simply
  63.  * hold a lock while calling bmi testcontext for performance reasons
  64.  * (particularly under NPTL)
  65.  */
  66. static gen_mutex_t bmi_test_mutex = GEN_MUTEX_INITIALIZER;
  67. static int bmi_test_flag = 0;
  68. static int bmi_test_cancel_waiter = 0;
  69. static int bmi_test_count = 0;
  70. static gen_mutex_t trove_test_mutex = GEN_MUTEX_INITIALIZER;
  71. static int trove_test_flag = 0;
  72. static int trove_test_count = 0;
  73.  
  74. static int bmi_thread_running = 0;
  75. static int trove_thread_running = 0;
  76. static int dev_thread_running = 0;
  77.  
  78. static gen_mutex_t bmi_thread_running_mutex = GEN_MUTEX_INITIALIZER;
  79.  
  80. /* trove_thread_function()
  81.  *
  82.  * function executed by the thread in charge of trove
  83.  */
  84. static void *trove_thread_function(void *ptr)
  85. {
  86.     int ret = -1;
  87.     int i=0;
  88.     struct PINT_thread_mgr_trove_callback *tmp_callback;
  89.     int timeout = thread_mgr_test_timeout;
  90.  
  91. #ifdef __PVFS2_JOB_THREADED__
  92.     PINT_event_thread_start("TROVE");
  93.     while (trove_thread_running)
  94. #endif
  95.     {
  96.     /* indicate that a test is in progress */
  97.     gen_mutex_lock(&trove_test_mutex);
  98.     trove_test_flag = 1;
  99.     gen_mutex_unlock(&trove_test_mutex);
  100.     
  101.     trove_test_count = THREAD_MGR_TEST_COUNT;
  102. #ifdef __PVFS2_TROVE_SUPPORT__
  103.     ret = trove_dspace_testcontext(HACK_fs_id,
  104.         stat_trove_id_array,
  105.         &trove_test_count,
  106.         stat_trove_error_code_array,
  107.         stat_trove_user_ptr_array,
  108.         timeout,
  109.         global_trove_context);
  110. #else
  111.     timeout = 0;
  112.     stat_trove_id_array[0] = 0;
  113.     HACK_fs_id = 0;
  114.     assert(0);
  115. #endif
  116.     gen_mutex_lock(&trove_test_mutex);
  117.     trove_test_flag = 0;
  118. #ifdef __PVFS2_JOB_THREADED__
  119.     pthread_cond_signal(&trove_test_cond);
  120. #endif
  121.     gen_mutex_unlock(&trove_test_mutex);
  122.  
  123.     if (ret < 0)
  124.     {
  125.         PVFS_perror_gossip("critical Trove failure.\n", ret);
  126. #ifdef __PVFS2_JOB_THREADED__
  127.             gossip_err("trove_thread_function thread terminating\n");
  128.             break;
  129. #endif
  130.             return NULL;
  131.     }
  132.  
  133.     for(i=0; i<trove_test_count; i++)
  134.     {
  135.         /* execute a callback for each completed BMI operation */
  136.         tmp_callback =  (struct PINT_thread_mgr_trove_callback*)
  137.                 stat_trove_user_ptr_array[i];
  138.  
  139.             if (!tmp_callback || !tmp_callback->fn)
  140.             {
  141.                 gossip_err("critical Trove failure (null callback)\n");
  142. #ifdef __PVFS2_JOB_THREADED__
  143.                 gossip_err("trove_thread_function thread terminating\n");
  144.                 break;
  145. #endif
  146.                 continue;
  147.             }
  148.         tmp_callback->fn(tmp_callback->data,
  149.                              stat_trove_error_code_array[i]);
  150.     }
  151.     }
  152. #ifdef __PVFS2_JOB_THREADED__
  153.     PINT_event_thread_stop();
  154. #endif
  155.     return (NULL);
  156. }
  157.  
  158. /* bmi_thread_function()
  159.  *
  160.  * function executed by the thread in charge of BMI
  161.  */
  162. static void *bmi_thread_function(void *ptr)
  163. {
  164.     int ret = -1;
  165.     int quick_flag = 0;
  166.     int incount, outcount;
  167.     int i=0;
  168.     int test_timeout = thread_mgr_test_timeout;
  169.     struct PINT_thread_mgr_bmi_callback *tmp_callback;
  170.     int thread_running=0;
  171.  
  172.     gen_mutex_lock(&bmi_thread_running_mutex);
  173.     thread_running = bmi_thread_running;
  174.     gen_mutex_unlock(&bmi_thread_running_mutex);
  175. #ifdef __PVFS2_JOB_THREADED__
  176.     PINT_event_thread_start("BMI");
  177.     while (thread_running)
  178. #endif
  179.     {/*start block*/
  180.     gen_mutex_lock(&bmi_mutex);
  181.     if(bmi_unexp_count)
  182.     {
  183.         incount = bmi_unexp_count;
  184.         if(incount > THREAD_MGR_TEST_COUNT)
  185.         incount = THREAD_MGR_TEST_COUNT;
  186.         gen_mutex_unlock(&bmi_mutex);
  187.  
  188.         ret = BMI_testunexpected(
  189.                 incount, &outcount, stat_bmi_unexp_array, 0);
  190.         if (ret < 0)
  191.         {
  192.                 PVFS_perror_gossip("critical BMI failure", ret);
  193. #ifdef __PVFS2_JOB_THREADED__
  194.                 continue;
  195. #endif
  196.  
  197.                 return NULL;
  198.         }
  199.  
  200.         /* execute callback for each completed unexpected message */
  201.             if (outcount > 0)
  202.             {
  203.                 gen_mutex_lock(&bmi_mutex);
  204.                 for (i=0; i<outcount; i++)
  205.                 {
  206.                     bmi_unexp_fn(&stat_bmi_unexp_array[i]);
  207.                     bmi_unexp_count--;
  208.                 }
  209.                 gen_mutex_unlock(&bmi_mutex);
  210.             }
  211.  
  212.         /* set a flag if we are getting as many incoming BMI unexpected
  213.          * operations as we can handle to indicate that we should cycle
  214.          * quickly 
  215.          */
  216.         if (outcount == THREAD_MGR_TEST_COUNT)
  217.         quick_flag = 1;
  218.     }
  219.     else
  220.     {
  221.         gen_mutex_unlock(&bmi_mutex);
  222.     }
  223.  
  224.     /* decide how long we are willing to wait on the main test call */
  225.     if(quick_flag)
  226.     {
  227.         quick_flag = 0;
  228.         test_timeout = 0;
  229.     }
  230.     else
  231.     {
  232.         test_timeout = thread_mgr_test_timeout;
  233.     }
  234.  
  235.     /* indicate that a test is in progress */
  236.     gen_mutex_lock(&bmi_test_mutex);
  237. #ifdef __PVFS2_JOB_THREADED__
  238.         /* wait politely for any cancel operations to run; else we're
  239.          * too fast in regrabbing the test_mutex and it hangs waiting for
  240.          * that small window where test_flag is zero. */
  241.         while (bmi_test_cancel_waiter) {
  242.             pthread_cond_wait(&bmi_test_cond, &bmi_test_mutex);
  243.         }
  244. #endif
  245.     bmi_test_flag = 1;
  246.     gen_mutex_unlock(&bmi_test_mutex);
  247.     
  248.     incount = THREAD_MGR_TEST_COUNT;
  249.     bmi_test_count = 0;
  250.  
  251.         memset(stat_bmi_user_ptr_array, 0,
  252.                (THREAD_MGR_TEST_COUNT * sizeof(void *)));
  253.  
  254.     ret = BMI_testcontext(incount, stat_bmi_id_array, &bmi_test_count,
  255.         stat_bmi_error_code_array, stat_bmi_actual_size_array,
  256.         stat_bmi_user_ptr_array, test_timeout, global_bmi_context);
  257.  
  258.     gen_mutex_lock(&bmi_test_mutex);
  259.     bmi_test_flag = 0;
  260. #ifdef __PVFS2_JOB_THREADED__
  261.     pthread_cond_signal(&bmi_test_cond);
  262. #endif
  263.     gen_mutex_unlock(&bmi_test_mutex);
  264.  
  265.     if(ret < 0)
  266.     {
  267.         PVFS_perror_gossip("critical BMI failure.\n", ret);
  268. #ifdef __PVFS2_JOB_THREADED__
  269.             gossip_err("bmi_thread_function thread terminating\n");
  270.             break;
  271. #endif
  272.  
  273.             return NULL;
  274.     }
  275.  
  276.     for(i=0; i<bmi_test_count; i++)
  277.     {
  278.         /* execute a callback for each completed BMI operation */
  279.         tmp_callback = (struct PINT_thread_mgr_bmi_callback*)
  280.                 stat_bmi_user_ptr_array[i];
  281.  
  282.             if (!tmp_callback || !tmp_callback->fn)
  283.             {
  284.                 gossip_err("critical BMI failure (null callback)\n");
  285. #ifdef __PVFS2_JOB_THREADED__
  286.                 gossip_err("bmi_thread_function thread terminating\n");
  287.                 break;
  288. #endif
  289.                 continue;
  290.             }
  291.  
  292.         tmp_callback->fn(tmp_callback->data,
  293.                              stat_bmi_actual_size_array[i],
  294.                              stat_bmi_error_code_array[i]);
  295.     }
  296.         gen_mutex_lock(&bmi_thread_running_mutex);
  297.         thread_running = bmi_thread_running;
  298.         gen_mutex_unlock(&bmi_thread_running_mutex);
  299.     } /*end block*/
  300.  
  301. #ifdef __PVFS2_JOB_THREADED__
  302.     PINT_event_thread_stop();
  303. #endif
  304.     return (NULL);
  305. }
  306.  
  307. /* dev_thread_function()
  308.  *
  309.  * function executed by the thread in charge of the device interface
  310.  */
  311. static void *dev_thread_function(void *ptr)
  312. {
  313.     int ret = -1;
  314.     int incount, outcount;
  315.     int i=0;
  316.     int timeout = thread_mgr_test_timeout;
  317.  
  318. #ifdef __PVFS2_JOB_THREADED__
  319.     while (dev_thread_running)
  320. #endif
  321.     {
  322.     gen_mutex_lock(&dev_mutex);
  323.     incount = dev_unexp_count;
  324.         while(incount == 0)
  325.         {
  326.             /* we need to wait until more unexp dev operations are posted */
  327. #ifdef __PVFS2_JOB_THREADED__
  328.             pthread_cond_wait(&dev_unexp_test_cond, &dev_mutex);
  329.             incount = dev_unexp_count;
  330. #else
  331.             gen_mutex_unlock(&dev_mutex);
  332.             return(NULL);
  333. #endif
  334.         }
  335.     if(incount > THREAD_MGR_TEST_COUNT)
  336.         {
  337.         incount = THREAD_MGR_TEST_COUNT;
  338.         }
  339.     gen_mutex_unlock(&dev_mutex);
  340.  
  341.     ret = PINT_dev_test_unexpected(
  342.             incount, &outcount, stat_dev_unexp_array, timeout);
  343.  
  344.     if (ret < 0)
  345.     {
  346.             PVFS_perror_gossip("critical device failure", ret);
  347.             gossip_err("Exiting...\n");
  348.             /* exit with a particular code so that the pvfs2-client wrapper
  349.              * knows that it should not attempt a restart of the
  350.              * pvfs2-client-core
  351.              */
  352.             exit(-PVFS_ENODEV);
  353. #ifdef __PVFS2_JOB_THREADED__
  354.             gossip_err("dev_thread_function thread terminating\n");
  355.             break;
  356. #endif
  357.             return NULL;
  358.     }
  359.  
  360.     /* execute callback for each completed unexpected message */
  361.     gen_mutex_lock(&dev_mutex);
  362.     for(i=0; i<outcount; i++)
  363.     {
  364.         dev_unexp_fn(&stat_dev_unexp_array[i]);
  365.         dev_unexp_count--;
  366.     }
  367.     gen_mutex_unlock(&dev_mutex);
  368.     }
  369.  
  370.     return (NULL);
  371. }
  372.  
  373.  
  374. /* PINT_thread_mgr_dev_start()
  375.  *
  376.  * starts a dev mgmt thread, if not already running
  377.  *
  378.  * returns 0 on success, -PVFS_error on failure
  379.  */
  380. int PINT_thread_mgr_dev_start(void)
  381. {
  382.     int ret = 0;
  383.  
  384.     gen_mutex_lock(&dev_mutex);
  385.     if(dev_thread_ref_count > 0)
  386.     {
  387.     /* nothing to do, thread is already started.  Just increment 
  388.      * reference count and return
  389.      */
  390.     dev_thread_ref_count++;
  391.         goto out;
  392.     }
  393.  
  394.     dev_thread_running = 1;
  395. #ifdef __PVFS2_JOB_THREADED__
  396.     ret = pthread_create(&dev_thread_id, NULL, dev_thread_function, NULL);
  397.     if(ret != 0)
  398.     {
  399.     dev_thread_running = 0;
  400.     /* TODO: convert error code */
  401.         ret = -ret;
  402.         goto out;
  403.     }
  404. #endif
  405.     dev_thread_ref_count++;
  406.  
  407. out:
  408.     gen_mutex_unlock(&dev_mutex);
  409.     return ret;
  410. }
  411.  
  412.  
  413.  
  414. /* PINT_thread_mgr_trove_start()
  415.  *
  416.  * starts a trove mgmt thread, if not already running
  417.  *
  418.  * returns 0 on success, -PVFS_error on failure
  419.  */
  420. int PINT_thread_mgr_trove_start(void)
  421. {
  422.     int ret;
  423.  
  424.     gen_mutex_lock(&trove_mutex);
  425.     if(trove_thread_ref_count > 0)
  426.     {
  427.     /* nothing to do, thread is already started.  Just increment 
  428.      * reference count and return
  429.      */
  430.     trove_thread_ref_count++;
  431.     gen_mutex_unlock(&trove_mutex);
  432.     return(0);
  433.     }
  434.  
  435. #ifdef __PVFS2_TROVE_SUPPORT__
  436.     /* if we reach this point, then we have to start the thread ourselves */
  437.     ret = trove_open_context(HACK_fs_id, &global_trove_context);
  438.     if(ret < 0)
  439.     {
  440.     gen_mutex_unlock(&trove_mutex);
  441.     return(ret);
  442.     }
  443.     trove_thread_ref_count++;
  444. #ifdef __PVFS2_JOB_THREADED__
  445.     trove_thread_running = 1;
  446.     ret = pthread_create(&trove_thread_id, NULL, trove_thread_function, NULL);
  447.     if(ret != 0)
  448.     {
  449.     trove_close_context(HACK_fs_id, global_trove_context);
  450.     gen_mutex_unlock(&trove_mutex);
  451.     trove_thread_running = 0;
  452.     /* TODO: convert error code */
  453.     return(-ret);
  454.     }
  455. #endif /* PVFS2_JOB_THREADED */
  456. #else
  457.     ret = 0;
  458.     assert(0);
  459. #endif /* PVFS2_TROVE_SUPPORT */
  460.  
  461.     gen_mutex_unlock(&trove_mutex);
  462.     return(0);
  463. }
  464.  
  465.  
  466. /* PINT_thread_mgr_bmi_start()
  467.  *
  468.  * starts a BMI mgmt thread, if not already running
  469.  *
  470.  * returns 0 on success, -PVFS_error on failure
  471.  */
  472. int PINT_thread_mgr_bmi_start(void)
  473. {
  474.     int ret = -1;
  475.  
  476.     gen_mutex_lock(&bmi_mutex);
  477.     if(bmi_thread_ref_count > 0)
  478.     {
  479.     /* nothing to do, thread is already started.  Just increment 
  480.      * reference count and return
  481.      */
  482.     bmi_thread_ref_count++;
  483.     gen_mutex_unlock(&bmi_mutex);
  484.     return(0);
  485.     }
  486.  
  487.     /* if we reach this point, then we have to start the thread ourselves */
  488.     ret = BMI_open_context(&global_bmi_context);
  489.     if(ret < 0)
  490.     {
  491.     gen_mutex_unlock(&bmi_mutex);
  492.     return(ret);
  493.     }
  494.  
  495.     gen_mutex_lock(&bmi_thread_running_mutex);
  496.     bmi_thread_running = 1;
  497.     gen_mutex_unlock(&bmi_thread_running_mutex);
  498. #ifdef __PVFS2_JOB_THREADED__
  499.     ret = pthread_create(&bmi_thread_id, NULL, bmi_thread_function, NULL);
  500.     if(ret != 0)
  501.     {
  502.     BMI_close_context(global_bmi_context);
  503.     gen_mutex_unlock(&bmi_mutex);
  504.         gen_mutex_lock(&bmi_thread_running_mutex);
  505.     bmi_thread_running = 0;
  506.         gen_mutex_unlock(&bmi_thread_running_mutex);
  507.     /* TODO: convert error code */
  508.     return(-ret);
  509.     }
  510. #endif
  511.     bmi_thread_ref_count++;
  512.  
  513.     gen_mutex_unlock(&bmi_mutex);
  514.     return(0);
  515. }
  516.  
  517. /* PINT_thread_mgr_dev_stop()
  518.  *
  519.  * stops a Trove mgmt thread
  520.  *
  521.  * returns 0 on success, -PVFS_error on failure
  522.  */
  523. int PINT_thread_mgr_dev_stop(void)
  524. {
  525.     gen_mutex_lock(&dev_mutex);
  526.     dev_thread_ref_count--;
  527.     if(dev_thread_ref_count <= 0)
  528.     {
  529.     assert(dev_thread_ref_count == 0); /* sanity check */
  530.     dev_thread_running = 0;
  531.         gen_mutex_unlock(&dev_mutex);
  532. #ifdef __PVFS2_JOB_THREADED__
  533.     pthread_join(dev_thread_id, NULL);
  534. #endif
  535.     }
  536.     else
  537.     {
  538.         gen_mutex_unlock(&dev_mutex);
  539.     }
  540.     return(0);
  541. }
  542.  
  543. /* PINT_thread_mgr_bmi_cancel()
  544.  *
  545.  * cancels a pending BMI operation for which the callback function has not
  546.  * yet been executed
  547.  *
  548.  * returns 0 on success, -PVFS_error on failure
  549.  */
  550. int PINT_thread_mgr_bmi_cancel(PVFS_id_gen_t id, void* user_ptr)
  551. {
  552.     int i;
  553.     int ret;
  554.  
  555.     /* wait until we can guarantee that a BMI_testcontext() is not in
  556.      * progress
  557.      */
  558.     gen_mutex_lock(&bmi_test_mutex);
  559.     ++bmi_test_cancel_waiter;
  560.     while(bmi_test_flag == 1)
  561.     {
  562. #ifdef __PVFS2_JOB_THREADED__
  563.     pthread_cond_wait(&bmi_test_cond, &bmi_test_mutex);
  564. #else
  565.     /* this condition shouldn't be possible without threads */
  566.     assert(0);
  567. #endif
  568.     }
  569.     --bmi_test_cancel_waiter;
  570.  
  571.     /* iterate down list of pending completions, to see if the caller is
  572.      * trying to cancel one of them
  573.      */
  574.     gossip_debug(GOSSIP_JOB_DEBUG,
  575.                  "%s: trying to cancel opid: %llu, ptr: %p.\n",
  576.              __func__, llu(id), user_ptr);
  577.     for(i=0; i<bmi_test_count; i++)
  578.     {
  579. #if 0
  580.     gossip_err("THREAD MGR bmi cancel scanning op: %llu.\n", 
  581.         llu(stat_bmi_id_array[i]));
  582. #endif
  583.     if(stat_bmi_id_array[i] == id && stat_bmi_user_ptr_array[i] ==
  584.         user_ptr)
  585.     {
  586. #if 0
  587.         gossip_err("THREAD MGR bmi cancel SKIPPING op: %llu.\n", 
  588.         llu(stat_bmi_id_array[i]));
  589. #endif
  590.         /* match; no steps needed to cancel, the op is already done */
  591.         gen_mutex_unlock(&bmi_test_mutex);
  592.         return(0);
  593.     }
  594.     }
  595.  
  596.     /* tell BMI to cancel the operation */
  597.     ret = BMI_cancel(id, global_bmi_context);
  598.     if(ret < 0)
  599.     gossip_err("WARNING: BMI cancel failed, proceeding anyway.\n");
  600. #ifdef __PVFS2_JOB_THREADED__
  601.     /* release waiting testcontext thread */
  602.     pthread_cond_signal(&bmi_test_cond);
  603. #endif
  604.     gen_mutex_unlock(&bmi_test_mutex);
  605.     return(ret);
  606. }
  607.  
  608. /* PINT_thread_mgr_trove_stop()
  609.  *
  610.  * stops a Trove mgmt thread
  611.  *
  612.  * returns 0 on success, -PVFS_error on failure
  613.  */
  614. int PINT_thread_mgr_trove_stop(void)
  615. {
  616.     gen_mutex_lock(&trove_mutex);
  617.     trove_thread_ref_count--;
  618.     if(trove_thread_ref_count <= 0)
  619.     {
  620.     assert(trove_thread_ref_count == 0); /* sanity check */
  621.     trove_thread_running = 0;
  622.         gen_mutex_unlock(&trove_mutex);
  623. #ifdef __PVFS2_JOB_THREADED__
  624.     pthread_join(trove_thread_id, NULL);
  625. #endif
  626. #ifdef __PVFS2_TROVE_SUPPORT__
  627.     trove_close_context(HACK_fs_id, global_trove_context);
  628. #else
  629.     assert(0);
  630. #endif
  631.     }
  632.     else
  633.     {
  634.         gen_mutex_unlock(&trove_mutex);
  635.     }
  636.     return(0);
  637. }
  638.  
  639.  
  640. /* PINT_thread_mgr_bmi_stop()
  641.  *
  642.  * stops a BMI mgmt thread, if not already running
  643.  *
  644.  * returns 0 on success, -PVFS_error on failure
  645.  */
  646. int PINT_thread_mgr_bmi_stop(void)
  647. {
  648.     gen_mutex_lock(&bmi_mutex);
  649.     bmi_thread_ref_count--;
  650.     if(bmi_thread_ref_count <= 0)
  651.     {
  652.     assert(bmi_thread_ref_count == 0); /* sanity check */
  653.         gen_mutex_lock(&bmi_thread_running_mutex);
  654.     bmi_thread_running = 0;
  655.         gen_mutex_unlock(&bmi_thread_running_mutex);
  656.         gen_mutex_unlock(&bmi_mutex);
  657. #ifdef __PVFS2_JOB_THREADED__
  658.     pthread_join(bmi_thread_id, NULL);
  659. #endif
  660.     BMI_close_context(global_bmi_context);
  661.     }
  662.     else
  663.     {
  664.         gen_mutex_unlock(&bmi_mutex);
  665.     }
  666.     return(0);
  667. }
  668.  
  669. /* PINT_thread_mgr_trove_getcontext()
  670.  *
  671.  * retrieves the context that the current trove thread is using
  672.  *
  673.  * returns 0 on success, -PVFS_error on failure
  674.  */
  675. int PINT_thread_mgr_trove_getcontext(PVFS_context_id *context)
  676. {
  677.     gen_mutex_lock(&trove_mutex);
  678.     if(trove_thread_ref_count > 0)
  679.     {
  680.     *context = global_trove_context;
  681.     gen_mutex_unlock(&trove_mutex);
  682.     return(0);
  683.     }
  684.     gen_mutex_unlock(&trove_mutex);
  685.  
  686.     return(-PVFS_EINVAL);
  687. }
  688.  
  689. /* PINT_thread_mgr_trove_cancel()
  690.  *
  691.  * cancels a pending Trove operation for which the callback function has not
  692.  * yet been executed
  693.  *
  694.  * returns 0 on success, -PVFS_error on failure
  695.  */
  696. int PINT_thread_mgr_trove_cancel(PVFS_id_gen_t id,
  697.                  PVFS_fs_id fs_id,
  698.                  void* user_ptr)
  699. {
  700.     int i;
  701.     int ret;
  702.  
  703.     /* wait until we can guarantee that a trove_testcontext() is not in
  704.      * progress
  705.      */
  706.     gen_mutex_lock(&trove_test_mutex);
  707.     while(trove_test_flag == 1)
  708.     {
  709. #ifdef __PVFS2_JOB_THREADED__
  710.     pthread_cond_wait(&trove_test_cond, &trove_test_mutex);
  711. #else
  712.     /* this condition shouldn't be possible without threads */
  713.     assert(0);
  714. #endif
  715.     }
  716.  
  717.     /* iterate down list of pending completions, to see if the caller is
  718.      * trying to cancel one of them
  719.      */
  720.     for(i=0; i<trove_test_count; i++)
  721.     {
  722. #if 0
  723.     gossip_err("THREAD MGR trove cancel scanning op: %llu.\n", 
  724.         llu(stat_trove_id_array[i]));
  725. #endif
  726.     if(stat_trove_id_array[i] == id && stat_trove_user_ptr_array[i] ==
  727.         user_ptr)
  728.     {
  729. #if 0
  730.         gossip_err("THREAD MGR trove cancel SKIPPING op: %llu.\n", 
  731.         llu(stat_trove_id_array[i]));
  732. #endif
  733.         /* match; no steps needed to cancel, the op is already done */
  734.         gen_mutex_unlock(&trove_test_mutex);
  735.         return(0);
  736.     }
  737.     }
  738.  
  739.     /* tell Trove to cancel the operation */
  740. #ifdef __PVFS2_TROVE_SUPPORT__
  741.     ret = trove_dspace_cancel(fs_id, id, global_trove_context);
  742. #else
  743.     ret = 0;
  744.     assert(0);
  745. #endif
  746.     gen_mutex_unlock(&trove_test_mutex);
  747.     return(ret);
  748. }
  749.  
  750. /* PINT_thread_mgr_bmi_getcontext()
  751.  *
  752.  * retrieves the context that the current BMI thread is using
  753.  *
  754.  * returns 0 on success, -PVFS_error on failure
  755.  */
  756. int PINT_thread_mgr_bmi_getcontext(PVFS_context_id *context)
  757. {
  758.     gen_mutex_lock(&bmi_mutex);
  759.     if(bmi_thread_ref_count > 0)
  760.     {
  761.     *context = global_bmi_context;
  762.     gen_mutex_unlock(&bmi_mutex);
  763.     return(0);
  764.     }
  765.     gen_mutex_unlock(&bmi_mutex);
  766.  
  767.     return(-PVFS_EINVAL);
  768. }
  769.  
  770. /* PINT_thread_mgr_dev_unexp_handler()
  771.  *
  772.  * registers a handler for unexpected device messages
  773.  *
  774.  * returns 0 on success, -PVFS_error on failure
  775.  */
  776. int PINT_thread_mgr_dev_unexp_handler(
  777.     void (*fn)(struct PINT_dev_unexp_info* unexp))
  778. {
  779.     /* sanity check */
  780.     assert(fn != 0);
  781.  
  782.     gen_mutex_lock(&dev_mutex);
  783.     if(dev_unexp_count > 0 && fn != dev_unexp_fn)
  784.     {
  785.     gossip_lerr("Error: dev_unexp_handler already set.\n");
  786.     gen_mutex_unlock(&dev_mutex);
  787.     return(-PVFS_EALREADY);
  788.     }
  789.     dev_unexp_fn = fn;
  790.     dev_unexp_count++;
  791.     if(dev_unexp_count == 1)
  792.     {
  793.         /* signal worker thread that may have been waiting for more ops */
  794. #ifdef __PVFS2_JOB_THREADED__
  795.         pthread_cond_signal(&dev_unexp_test_cond);
  796. #endif
  797.     }
  798.     gen_mutex_unlock(&dev_mutex);
  799.     return(0);
  800. }
  801.  
  802.  
  803. /* PINT_thread_mgr_bmi_unexp_handler()
  804.  *
  805.  * registers a handler for unexpected BMI messages
  806.  *
  807.  * returns 0 on success, -PVFS_error on failure
  808.  */
  809. int PINT_thread_mgr_bmi_unexp_handler(
  810.     void (*fn)(struct BMI_unexpected_info* unexp))
  811. {
  812.     /* sanity check */
  813.     assert(fn != 0);
  814.  
  815.     gen_mutex_lock(&bmi_mutex);
  816.     if(bmi_unexp_count > 0 && fn != bmi_unexp_fn)
  817.     {
  818.     gossip_lerr("Error: bmi_unexp_handler already set.\n");
  819.     gen_mutex_unlock(&bmi_mutex);
  820.     return(-PVFS_EALREADY);
  821.     }
  822.     bmi_unexp_fn = fn;
  823.     bmi_unexp_count++;
  824.     gen_mutex_unlock(&bmi_mutex);
  825.     return(0);
  826. }
  827.  
  828. /* PINT_thread_mgr_dev_push()
  829.  *
  830.  * pushes on test progress manually, without using threads 
  831.  *
  832.  * no return value 
  833.  */
  834. void PINT_thread_mgr_dev_push(int max_idle_time)
  835. {
  836.     thread_mgr_test_timeout = max_idle_time;
  837.     dev_thread_function(NULL);
  838. }
  839.  
  840.  
  841. /* PINT_thread_mgr_trove_push()
  842.  *
  843.  * pushes on test progress manually, without using threads 
  844.  *
  845.  * no return value 
  846.  */
  847. void PINT_thread_mgr_trove_push(int max_idle_time)
  848. {
  849.     thread_mgr_test_timeout = max_idle_time;
  850.     trove_thread_function(NULL);
  851. }
  852.  
  853. /* PINT_thread_mgr_bmi_push()
  854.  *
  855.  * pushes on test progress manually, without using threads 
  856.  *
  857.  * no return value 
  858.  */
  859. void PINT_thread_mgr_bmi_push(int max_idle_time)
  860. {
  861.     thread_mgr_test_timeout = max_idle_time;
  862.     bmi_thread_function(NULL);
  863. }
  864.  
  865. /*
  866.  * Local variables:
  867.  *  c-indent-level: 4
  868.  *  c-basic-offset: 4
  869.  * End:
  870.  *
  871.  * vim: ts=8 sts=4 sw=4 expandtab
  872.  */
  873.