home *** CD-ROM | disk | FTP | other *** search
/ ftp.parl.clemson.edu / 2015-02-07.ftp.parl.clemson.edu.tar / ftp.parl.clemson.edu / pub / pvfs2 / orangefs-2.8.3-20110323.tar.gz / orangefs-2.8.3-20110323.tar / orangefs / src / io / flow / flow.c < prev    next >
C/C++ Source or Header  |  2007-06-01  |  13KB  |  538 lines

  1. /*
  2.  * (C) 2001 Clemson University and The University of Chicago
  3.  *
  4.  * See COPYING in top-level directory.
  5.  */
  6.  
  7. /* This the top level implementation of the flow interface */
  8. /* (see flow.h) */
  9.  
  10. #include <errno.h>
  11. #include <sys/time.h>
  12. #include <unistd.h>
  13. #include <string.h>
  14. #include <assert.h>
  15.  
  16. #include "quicklist.h"
  17. #include "gossip.h"
  18. #include "gen-locks.h"
  19. #include "str-utils.h"
  20. #include "flow.h"
  21. #include "flowproto-support.h"
  22. #include "flow-ref.h"
  23.  
  24. /* mutex lock used to prevent more than one process from entering the
  25.  * interface at a time
  26.  */
  27. static gen_mutex_t interface_mutex = GEN_MUTEX_INITIALIZER;
  28.  
  29. /* number of active flow protocols */
  30. static int active_flowproto_count = 0;
  31. /* table of active flow protocols */
  32. static struct flowproto_ops **active_flowproto_table = NULL;
  33. /* mappings of flow endpoints to the correct protocol */
  34. static flow_ref_p flow_mapping = NULL;
  35.  
  36. static void flow_release(flow_descriptor * flow_d);
  37.  
  38. /* bring in the flowproto interfaces that we need */
  39. #ifdef __STATIC_FLOWPROTO_TEMPLATE__
  40. extern struct flowproto_ops flowproto_template_ops;
  41. #endif
  42. #ifdef __STATIC_FLOWPROTO_DUMP_OFFSETS__
  43. extern struct flowproto_ops flowproto_dump_offsets_ops;
  44. #endif
  45. #ifdef __STATIC_FLOWPROTO_BMI_CACHE__
  46. extern struct flowproto_ops fp_bmi_cache_ops;
  47. #endif
  48. #ifdef __STATIC_FLOWPROTO_MULTIQUEUE__
  49. extern struct flowproto_ops fp_multiqueue_ops;
  50. #endif
  51.  
  52. static struct flowproto_ops *static_flowprotos[] = {
  53. #ifdef __STATIC_FLOWPROTO_TEMPLATE__
  54.     &flowproto_template_ops,
  55. #endif
  56. #ifdef __STATIC_FLOWPROTO_DUMP_OFFSETS__
  57.     &flowproto_dump_offsets_ops,
  58. #endif
  59. #ifdef __STATIC_FLOWPROTO_BMI_CACHE__
  60.     &fp_bmi_cache_ops,
  61. #endif
  62. #ifdef __STATIC_FLOWPROTO_MULTIQUEUE__
  63.     &fp_multiqueue_ops,
  64. #endif
  65.     NULL
  66. };
  67.  
  68. /* PINT_flow_initialize()
  69.  *
  70.  * initializes the flow interface.  Should be called exactly once before
  71.  * any other operations are performed.
  72.  *
  73.  * flowproto_list specifies which flow protocols to initialize; if NULL,
  74.  * all compiled in flowprotocols will be started
  75.  * TODO: change this so that we can add flowprotocols on the fly as needed
  76.  * rather than having to make the decision on what to init right now
  77.  *
  78.  * returns 0 on success, -errno on failure
  79.  */
  80. int PINT_flow_initialize(
  81.     const char *flowproto_list,
  82.     int flags)
  83. {
  84.     int ret = -1;
  85.     int i = 0, j = 0, already_exists = 0, active_flow_index = 0, 
  86.         requested_flowproto_count = 0;
  87.     char **requested_flowprotos = NULL;
  88.     struct flowproto_ops **tmp_flowproto_ops = NULL;
  89.  
  90.     gen_mutex_lock(&interface_mutex);
  91.  
  92.     if(flowproto_list)
  93.     {
  94.     /* seperate out the list of flowprotos to activate */
  95.     active_flowproto_count = PINT_split_string_list(
  96.             &requested_flowprotos, flowproto_list);
  97.     if (active_flowproto_count < 1)
  98.     {
  99.         gossip_lerr("Error: bad flow protocol list.\n");
  100.         ret = -EINVAL;
  101.         goto PINT_flow_initialize_failure;
  102.     }
  103.         requested_flowproto_count = active_flowproto_count;
  104.     }
  105.     else
  106.     {
  107.     /* count compiled in flow protocols, we will activate all of them */
  108.     tmp_flowproto_ops = static_flowprotos;
  109.     active_flowproto_count = 0;
  110.     while ((*tmp_flowproto_ops) != NULL)
  111.     {
  112.         tmp_flowproto_ops++;
  113.         active_flowproto_count++;
  114.     }
  115.         requested_flowproto_count = active_flowproto_count;
  116.     }
  117.  
  118.     /* create table to keep up with active flow protocols */
  119.     active_flowproto_table =
  120.         malloc((active_flowproto_count * sizeof(struct flowproto_ops *)));
  121.     if (!active_flowproto_table)
  122.     {
  123.     ret = -ENOMEM;
  124.     goto PINT_flow_initialize_failure;
  125.     }
  126.     memset(active_flowproto_table, 0,
  127.            (active_flowproto_count * sizeof(struct flowproto_ops *)));
  128.  
  129.     /* find the interface for each requested method and load it into the
  130.      * active table.
  131.      */
  132.     if(flowproto_list)
  133.     {
  134.     for (i = 0; i < requested_flowproto_count; i++)
  135.     {
  136.         tmp_flowproto_ops = static_flowprotos;
  137.         while ((*tmp_flowproto_ops) != NULL &&
  138.            strcmp((*tmp_flowproto_ops)->flowproto_name,
  139.               requested_flowprotos[i]) != 0)
  140.         {
  141.         tmp_flowproto_ops++;
  142.         }
  143.         if ((*tmp_flowproto_ops) == NULL)
  144.         {
  145.         gossip_lerr("Error: no flowproto available for: %s\n",
  146.                 requested_flowprotos[i]);
  147.         ret = -ENOPROTOOPT;
  148.         goto PINT_flow_initialize_failure;
  149.         }
  150.  
  151.             /* check that flowproto hasn't already been added to table */
  152.             already_exists = 0;
  153.             for(j = 0; j < active_flow_index; ++j)
  154.             {
  155.                 if(active_flowproto_table[j] == (*tmp_flowproto_ops))
  156.                 {
  157.                     already_exists = 1;
  158.                 }
  159.             }
  160.             
  161.             if(!already_exists)
  162.             {
  163.                 active_flowproto_table[active_flow_index++] = 
  164.                     (*tmp_flowproto_ops);
  165.             }
  166.             else
  167.             {
  168.                 active_flowproto_count--;
  169.             }
  170.     }
  171.     }
  172.     else
  173.     {
  174.     tmp_flowproto_ops = static_flowprotos;
  175.     for(i=0; i<active_flowproto_count; i++)
  176.     {
  177.         active_flowproto_table[i] = (*tmp_flowproto_ops);
  178.         tmp_flowproto_ops++;
  179.     }
  180.     }
  181.  
  182.     /* create a cache of mappings to flow protocols */
  183.     flow_mapping = flow_ref_new();
  184.     if (!flow_mapping)
  185.     {
  186.     ret = -ENOMEM;
  187.     goto PINT_flow_initialize_failure;
  188.     }
  189.  
  190.     /* initialize all of the flow protocols */
  191.     for (i = 0; i < active_flowproto_count; i++)
  192.     {
  193.     ret = active_flowproto_table[i]->flowproto_initialize(i);
  194.     if (ret < 0)
  195.     {
  196.         gossip_lerr("Error: could not initialize protocol: %s.\n",
  197.             active_flowproto_table[i]->flowproto_name);
  198.         goto PINT_flow_initialize_failure;
  199.     }
  200.     }
  201.  
  202.     /* get rid of method string list */
  203.     PINT_free_string_list(requested_flowprotos, requested_flowproto_count);
  204.  
  205.     gen_mutex_unlock(&interface_mutex);
  206.     return (0);
  207.  
  208.   PINT_flow_initialize_failure:
  209.  
  210.     /* shut down any protocols which may have started */
  211.     if (active_flowproto_table)
  212.     {
  213.     for (i = 0; i < active_flowproto_count; i++)
  214.     {
  215.         if (active_flowproto_table[i])
  216.         {
  217.         active_flowproto_table[i]->flowproto_finalize();
  218.         }
  219.     }
  220.     free(active_flowproto_table);
  221.     }
  222.  
  223.     /* get rid of method string list */
  224.     PINT_free_string_list(requested_flowprotos, requested_flowproto_count);
  225.     active_flowproto_count = 0;
  226.  
  227.     if (flow_mapping)
  228.     {
  229.     flow_ref_cleanup(flow_mapping);
  230.     }
  231.     gen_mutex_unlock(&interface_mutex);
  232.     return (ret);
  233. }
  234.  
  235. /* PINT_flow_finalize()
  236.  *
  237.  * shuts down the flow interface.  
  238.  *
  239.  * returns 0 on success, -errno on failure
  240.  */
  241. int PINT_flow_finalize(void)
  242. {
  243.     int i = 0;
  244.     int ret = 0, tret;
  245.  
  246.     gen_mutex_lock(&interface_mutex);
  247.  
  248.     /* shut down each active protocol */
  249.     for (i = 0; i < active_flowproto_count; i++)
  250.     {
  251.     tret = active_flowproto_table[i]->flowproto_finalize();
  252.         if (tret)
  253.             ret = tret;
  254.     }
  255.  
  256.     free(active_flowproto_table);
  257.  
  258.     active_flowproto_count = 0;
  259.  
  260.     flow_ref_cleanup(flow_mapping);
  261.  
  262.     gen_mutex_unlock(&interface_mutex);
  263.     return ret;
  264. }
  265.  
  266. /* PINT_flow_alloc()
  267.  * 
  268.  * Allocates a new flow descriptor and sets the source and destination
  269.  * endpoints.
  270.  *
  271.  * returns pointer to descriptor on success, NULL on failure
  272.  */
  273. flow_descriptor *PINT_flow_alloc(void)
  274. {
  275.     flow_descriptor *tmp_desc = NULL;
  276.  
  277.     tmp_desc = (flow_descriptor *)malloc(sizeof(struct flow_descriptor));
  278.     if (tmp_desc)
  279.     {
  280.         gen_mutex_init(&tmp_desc->flow_mutex);
  281.         PINT_flow_reset(tmp_desc);
  282.     }
  283.     return tmp_desc;
  284. }
  285.  
  286.  
  287. /* PINT_flow_reset()
  288.  * 
  289.  * resets an existing flow descriptor to its initial state 
  290.  *
  291.  * returns pointer to descriptor on success, NULL on failure
  292.  */
  293. void PINT_flow_reset(flow_descriptor *flow_d)
  294. {
  295.     assert(flow_d);
  296.  
  297.     memset(flow_d, 0, sizeof(struct flow_descriptor));
  298.  
  299.     flow_d->flowproto_id = -1;
  300.     flow_d->aggregate_size = -1;
  301.     flow_d->state = FLOW_INITIAL;
  302.     flow_d->type = FLOWPROTO_DEFAULT;
  303.     flow_d->buffers_per_flow = -1;
  304.     flow_d->buffer_size = -1;
  305. }
  306.  
  307. /* PINT_flow_free()
  308.  * 
  309.  * destroys a flow descriptor
  310.  *
  311.  * no return value
  312.  */
  313. void PINT_flow_free(flow_descriptor *flow_d)
  314. {
  315.     assert(flow_d);
  316.  
  317.     gen_mutex_destroy(&flow_d->flow_mutex);
  318.     free(flow_d);
  319. }
  320.  
  321. /* PINT_flow_clear()
  322.  * 
  323.  * clears the flow descriptor but doesn't destroy it
  324.  *
  325.  * no return value
  326.  */
  327. void PINT_flow_clear(flow_descriptor *flow_d)
  328. {
  329.     assert(flow_d);
  330.  
  331.     memset(flow_d, 0, sizeof(flow_descriptor));
  332. }
  333.  
  334. /* PINT_flow_post()
  335.  * 
  336.  * Posts a flow descriptor to the flow interface so that it may be
  337.  * processed
  338.  *
  339.  * returns 0 on success, -errno on failure
  340.  */
  341. int PINT_flow_post(flow_descriptor * flow_d)
  342. {
  343.     int flowproto_id = -1;
  344.     int ret = -1;
  345.     int i;
  346.     int type = flow_d->type;
  347.  
  348.     assert(flow_d->callback);
  349.     /* sanity check; if the caller doesn't provide a memory datatype,
  350.      * then the must at least indicate the aggregate size to transfer
  351.      */
  352.     assert(flow_d->aggregate_size > -1 || flow_d->mem_req != 0);
  353.  
  354.     gen_mutex_lock(&interface_mutex);
  355.  
  356.     /* NOTE: if an error occurs here, then we will normally just return
  357.      * -errno and _not_ set any error codes in the flow descriptor.
  358.      */
  359.  
  360.     /* search for match to specified flow protocol type */
  361.     for(i=0; i<active_flowproto_count; i++)
  362.     {
  363.     ret =
  364.         active_flowproto_table[i]->flowproto_getinfo(NULL,
  365.         FLOWPROTO_TYPE_QUERY,
  366.         &type);
  367.     if(ret >= 0)
  368.     {
  369.         flowproto_id = i;
  370.         break;
  371.     }
  372.     }
  373.  
  374.     if (flowproto_id < 0)
  375.     {
  376.     gen_mutex_unlock(&interface_mutex);
  377.     gossip_err("Error: requested flow protocol %d, which doesn't appear to be loaded.\n", (int)type);
  378.     return (-ENOPROTOOPT);
  379.     }
  380.  
  381.     /* setup the request processing states */
  382.     flow_d->file_req_state = PINT_new_request_state(flow_d->file_req);
  383.     if (!flow_d->file_req_state)
  384.     {
  385.     gen_mutex_unlock(&interface_mutex);
  386.     return (-EINVAL);
  387.     }
  388.  
  389.     /* only setup a memory datatype state if caller provided a memory datatype */
  390.     if(flow_d->mem_req)
  391.     {
  392.     flow_d->mem_req_state = PINT_new_request_state(flow_d->mem_req);
  393.     if (!flow_d->mem_req_state)
  394.     {
  395.         gen_mutex_unlock(&interface_mutex);
  396.         return (-EINVAL);
  397.     }
  398.     }
  399.  
  400.     flow_d->release = flow_release;
  401.  
  402.     /* post the flow to the flow protocol level */
  403.     flow_d->flowproto_id = flowproto_id;
  404.     ret = active_flowproto_table[flowproto_id]->flowproto_post(flow_d);
  405.     gen_mutex_unlock(&interface_mutex);
  406.     return (ret);
  407. }
  408.  
  409.  
  410. /* PINT_flow_cancel()
  411.  * 
  412.  * attempts to cancel a previously posted (but not yet completed) flow
  413.  *
  414.  * returns 0 on successful attempt, -errno on failure
  415.  */
  416. int PINT_flow_cancel(flow_descriptor * flow_d)
  417. {
  418.     int ret;
  419.  
  420.     gen_mutex_lock(&interface_mutex);
  421.     assert(flow_d);
  422.     assert(flow_d->flowproto_id >= 0);
  423.  
  424.     if(active_flowproto_table[flow_d->flowproto_id]->flowproto_cancel)
  425.     {
  426.     ret =
  427.     active_flowproto_table[flow_d->flowproto_id]->flowproto_cancel(flow_d);
  428.     }
  429.     else
  430.     {
  431.     ret = -ENOSYS;
  432.     }
  433.     
  434.     gen_mutex_unlock(&interface_mutex);
  435.     return (ret);
  436. }
  437.  
  438. /* PINT_flow_setinfo()
  439.  * 
  440.  * Used to pass along hints or configuration info to the flow
  441.  * interface
  442.  *
  443.  * returns 0 on success, -errno on failure
  444.  */
  445. int PINT_flow_setinfo(flow_descriptor *flow_d,
  446.               int option,
  447.               void *parameter)
  448. {
  449.     int ret, tret, i;
  450.  
  451.     gen_mutex_lock(&interface_mutex);
  452.     if (flow_d)
  453.     {
  454.         ret = active_flowproto_table[
  455.             flow_d->flowproto_id]->flowproto_setinfo(
  456.                 flow_d, option, parameter);
  457.     }
  458.     else
  459.     {
  460.         ret = -ENOSYS;  /* success if any flowproto could handle it */
  461.         for(i = 0; i < active_flowproto_count; i++)
  462.         {
  463.             tret = active_flowproto_table[i]->flowproto_setinfo(
  464.                 flow_d, option, parameter);
  465.             if (tret == 0)
  466.                 ret = 0;
  467.         }
  468.     }
  469.     gen_mutex_unlock(&interface_mutex);
  470.  
  471.     return ret;
  472. }
  473.  
  474.  
  475. /* PINT_flow_getinfo()
  476.  * 
  477.  * Used to query for parameters or information from the flow interface
  478.  *
  479.  * returns 0 on success, -errno on failure
  480.  */
  481. int PINT_flow_getinfo(flow_descriptor *flow_d,
  482.               enum flow_getinfo_option opt,
  483.               void *parameter)
  484. {
  485.     PVFS_size* tmp_size;
  486.  
  487.     gen_mutex_lock(&interface_mutex);
  488.     
  489.     switch(opt)
  490.     {
  491.     case FLOW_AMT_COMPLETE_QUERY:
  492.     tmp_size = (PVFS_size*)parameter;
  493.     *tmp_size = flow_d->total_transferred;
  494.     break;
  495.     default:
  496.     break;
  497.     }
  498.  
  499.     gen_mutex_unlock(&interface_mutex);
  500.     return (0);
  501. }
  502.  
  503. /*****************************************************************
  504.  * Internal helper functions
  505.  */
  506.  
  507. /* TODO: we need to do this from the flow protocol level now */
  508. /* TODO: or else incorporate it into callback */
  509. /* flow_release()
  510.  *
  511.  * releases any resources associated with a flow before returning it to
  512.  * the user
  513.  *
  514.  * no return value
  515.  */
  516. static void flow_release(flow_descriptor *flow_d)
  517. {
  518.     /* let go of the request processing states */
  519.     if (flow_d->file_req_state)
  520.     {
  521.     PINT_free_request_state(flow_d->file_req_state);
  522.     }
  523.  
  524.     if (flow_d->mem_req_state)
  525.     {
  526.     PINT_free_request_state(flow_d->mem_req_state);
  527.     }
  528. }
  529.  
  530. /*
  531.  * Local variables:
  532.  *  c-indent-level: 4
  533.  *  c-basic-offset: 4
  534.  * End:
  535.  *
  536.  * vim: ts=8 sts=4 sw=4 expandtab
  537.  */
  538.