home *** CD-ROM | disk | FTP | other *** search
/ ftp.parl.clemson.edu / 2015-02-07.ftp.parl.clemson.edu.tar / ftp.parl.clemson.edu / pub / pvfs2 / orangefs-2.8.3-20110323.tar.gz / orangefs-2.8.3-20110323.tar / orangefs / src / io / flow / flowproto-bmi-cache / flowproto-bmi-cache-server.c next >
C/C++ Source or Header  |  2010-04-30  |  31KB  |  1,167 lines

  1. /*
  2.  * (C) 2001 Clemson University and The University of Chicago
  3.  *
  4.  * See COPYING in top-level directory.
  5.  */
  6.  
  7. #include <errno.h>
  8. #include <assert.h>
  9. #include <stdlib.h>
  10. #include <string.h>
  11. #include <sys/time.h>
  12. #include <unistd.h>
  13.  
  14. //#undef __PVFS2_TROVE_SUPPORT__
  15.  
  16. #include "gossip.h"
  17. #include "quicklist.h"
  18. #include "src/io/flow/flowproto-support.h"
  19. #include "gen-locks.h"
  20. #include "bmi.h"
  21. #include "thread-mgr.h"
  22.  
  23. #include "trove.h"
  24. #include "ncac-interface.h"
  25. #include "internal.h"
  26. #include "pvfs2-internal.h"
  27.     
  28. #define BUFFER_SIZE (256*1024)
  29. #define MAX_REGIONS 16
  30.  
  31. #define INT_REQ_EMPTY        -1
  32. #define INT_REQ_INIT        0
  33. #define INT_REQ_PROCESSING    1
  34. #define INT_REQ_COMPLETE    2
  35.  
  36. #define NONBLOCKING_FLAG    0
  37. #define BLOCKING_FLAG        1
  38.  
  39. #define BMI_TO_CACHE     0
  40. #define CACHE_TO_BMI    1
  41.  
  42. #define NO_MUTEX_FLAG    0
  43. #define MUTEX_FLAG    1
  44.  
  45.      
  46. /* Noted by wuj: The flowproto-bmi-cache design has significant 
  47.  * differences from the template "bmi-trove" design in handling 
  48.  * buffers. These difference are reflected in the
  49.  * data structure added and/or changed in fp_queue_item and other
  50.  * related structures. 
  51.  */ 
  52.  
  53. struct pint_req_entry
  54. {
  55.     PINT_Request_result result;
  56.     PVFS_size size_list[MAX_REGIONS];
  57.     PVFS_offset offset_list[MAX_REGIONS];
  58. };
  59.  
  60. struct cache_req_entry
  61. {
  62.     cache_request_t request;  /* cache request handle */
  63.     int     errval;        /* error code */
  64.     int mem_cnt;            /* how many buffers */
  65.  
  66.     /* buffer size array, array space provided by the cache */
  67.     PVFS_size *msize_list; 
  68.  
  69.     /* "total_size" is the sum of the size list */
  70.     PVFS_size total_size; 
  71.  
  72.     /* buffer offset array, provided by the cache */
  73.     PVFS_offset **moff_list; 
  74.  
  75.     /* if this is not NULL, this buffer is supplied by the flow */
  76.     PVFS_offset *buffer;
  77.  
  78.     enum bmi_buffer_type buffer_type;
  79. };
  80.  
  81.  
  82. /* fp_queue_item describes an individual request being used within the flow.
  83.  * A request --> a flow --> a list of fp_queue_items */
  84. struct fp_queue_item
  85. {
  86.     /* point to the flow descriptor */
  87.     flow_descriptor*     parent;    
  88.  
  89.     /* PINT request information */
  90.     struct pint_req_entry     pint_req; 
  91.  
  92.     /* cache request information */
  93.     struct cache_req_entry     cache_req; 
  94.     int int_state;
  95.  
  96.     /* flag to show whether the callbacks are set up */
  97.     int     callback_setup;
  98.     struct PINT_thread_mgr_bmi_callback bmi_callback;
  99.     struct PINT_thread_mgr_trove_callback cache_callback;
  100.  
  101.     struct qlist_head list_link;
  102. };
  103.  
  104. /* fp_private_data is information specific to this flow protocol, stored
  105.  * in flow descriptor but hidden from caller
  106.  */
  107. struct fp_private_data
  108. {
  109.     /* point to the flow descriptor */
  110.     flow_descriptor* parent;       
  111.  
  112.     /* PINT request done? */
  113.     int pint_request_done;    
  114.     
  115.     /* PINT request list */
  116.     struct qlist_head pint_req_list;
  117.  
  118.     /* requests completed on the cache side */ 
  119.     struct qlist_head cache_req_done_list;  
  120.  
  121.     PVFS_size total_bytes_processed;
  122.     TROVE_context_id trove_context;
  123.  
  124.     gen_mutex_t flow_mutex;
  125. };
  126.  
  127. #define PRIVATE_FLOW(target_flow)\
  128.     ((struct fp_private_data*)(target_flow->flow_protocol_data))
  129.  
  130. static bmi_context_id global_bmi_context = -1;
  131.  
  132. static TROVE_context_id global_trove_context = -1;
  133.  
  134.  
  135. static void bmi_recv_callback_fn(void *user_ptr,
  136.                  PVFS_size actual_size,
  137.                  PVFS_error error_code);
  138.  
  139. static void bmi_send_callback_fn(void *user_ptr,
  140.                  PVFS_size actual_size,
  141.                  PVFS_error error_code);
  142.  
  143. #if 0
  144. /* the above function is a special case; we need to look at a return
  145.  * value when we invoke it directly, so we use the following function
  146.  * to trigger it from a callback
  147.  */
  148. static void bmi_send_callback_wrapper(void *user_ptr,
  149.                  PVFS_size actual_size,
  150.                  PVFS_error error_code)
  151. {
  152.     bmi_send_callback_fn(user_ptr, actual_size, error_code);
  153.     return;
  154. };
  155. #endif
  156.  
  157. static void cache_read_callback_fn(void *user_ptr,
  158.                    PVFS_error error_code);
  159.  
  160. static void cache_write_callback_fn(void *user_ptr,
  161.                    PVFS_error error_code);
  162.  
  163. /* protocol specific functions */
  164. static int  bmi_cache_request_init(struct fp_private_data *flow_data, 
  165.             int direction);
  166.  
  167. static int  bmi_cache_progress_check(struct flow_descriptor *flow_d, 
  168.             int wait_flag, 
  169.             int mutex_flag,
  170.             int direction);
  171.  
  172.  
  173. static int bmi_cache_check_cache_req(struct fp_queue_item *q_item); 
  174. static int bmi_cache_release_cache_src(struct fp_queue_item *q_item);
  175. static int bmi_cache_init_cache_req(struct fp_queue_item *qitem, int op);
  176.  
  177.  
  178. /* interface prototypes */
  179. static int fp_bmi_cache_initialize(int flowproto_id);
  180.  
  181. static int fp_bmi_cache_finalize(void);
  182.  
  183. static int fp_bmi_cache_getinfo(flow_descriptor * flow_d,
  184.                    int option,
  185.                    void *parameter);
  186.  
  187. static int fp_bmi_cache_setinfo(flow_descriptor * flow_d,
  188.                    int option,
  189.                    void *parameter);
  190.  
  191. static int fp_bmi_cache_post(flow_descriptor * flow_d);
  192.  
  193.  
  194. static char fp_bmi_cache_name[] = "flowproto_bmi_cache";
  195.  
  196. struct flowproto_ops fp_bmi_cache_ops = {
  197.     fp_bmi_cache_name,
  198.     fp_bmi_cache_initialize,
  199.     fp_bmi_cache_finalize,
  200.     fp_bmi_cache_getinfo,
  201.     fp_bmi_cache_setinfo,
  202.     fp_bmi_cache_post
  203. };
  204.  
  205.  
  206. /* TODO: where we initialize cache. For the timebeing, I put it here. 
  207.  * This should be changed later.
  208.  */ 
  209. static int cache_initialized = 0;
  210.  
  211. /* fp_bmi_cache_initialize()
  212.  *
  213.  * starts up the flow protocol
  214.  *
  215.  * returns 0 on succes, -PVFS_error on failure
  216.  */
  217. int fp_bmi_cache_initialize(int flowproto_id)
  218. {
  219.     int ret = -1;
  220.  
  221.     ret = PINT_thread_mgr_bmi_start();
  222.     if(ret < 0)
  223.     return(ret);
  224.     PINT_thread_mgr_bmi_getcontext((PVFS_context_id *)&global_bmi_context);
  225.  
  226.     return(0);
  227. }
  228.  
  229. /* fp_bmi_cache_finalize()
  230.  *
  231.  * shuts down the flow protocol
  232.  *
  233.  * returns 0 on success, -PVFS_error on failure
  234.  */
  235. int fp_bmi_cache_finalize(void)
  236. {
  237.     PINT_thread_mgr_bmi_stop();
  238.     return (0);
  239. }
  240.  
  241. /* fp_bmi_cache_getinfo()
  242.  *
  243.  * retrieves runtime parameters from flow protocol
  244.  *
  245.  * returns 0 on success, -PVFS_error on failure
  246.  */
  247. int fp_bmi_cache_getinfo(flow_descriptor * flow_d,
  248.                    int option,
  249.                    void *parameter)
  250. {
  251.     int* type;
  252.  
  253.     switch(option)
  254.     {
  255.     case FLOWPROTO_TYPE_QUERY:
  256.         type = parameter;
  257.         if(*type == FLOWPROTO_BMI_CACHE)
  258.         return(0);
  259.         else
  260.         return(-PVFS_ENOPROTOOPT);
  261.     default:
  262.         return(-PVFS_ENOSYS);
  263.     }
  264. }
  265.  
  266. /* fp_bmi_cache_setinfo()
  267.  *
  268.  * sets runtime parameters in flow protocol
  269.  *
  270.  * returns 0 on success, -PVFS_error on failure
  271.  */
  272. int fp_bmi_cache_setinfo(flow_descriptor * flow_d,
  273.                    int option,
  274.                    void *parameter)
  275. {
  276.     return (-PVFS_ENOSYS);
  277. }
  278.  
  279. /* fp_bmi_cache_post()
  280.  *
  281.  * posts a flow descriptor to begin work
  282.  *
  283.  * returns 0 on success, 1 on immediate completion, -PVFS_error on failure
  284.  */
  285. int fp_bmi_cache_post(flow_descriptor * flow_d)
  286. {
  287.     struct fp_private_data* flow_data = NULL;
  288.     NCAC_info_t info;
  289.     int ret;
  290.  
  291.     /* on the server side: only two possible types */
  292.     fprintf(stderr, "src.endpoint:%d, desc.endpoint:%d\n", flow_d->src.endpoint_id, flow_d->dest.endpoint_id);
  293.  
  294.     assert( (flow_d->src.endpoint_id == BMI_ENDPOINT && 
  295.         flow_d->dest.endpoint_id == TROVE_ENDPOINT) ||
  296.         (flow_d->src.endpoint_id == TROVE_ENDPOINT &&
  297.         flow_d->dest.endpoint_id == BMI_ENDPOINT) );
  298.  
  299.     /* TODO: seems not right here. coll_id --> trove_context id */
  300.     if ( !cache_initialized ) {
  301.         info.max_req_num = 1000;
  302.         info.extsize     = 32768;
  303.         info.cachesize   = 40*1048576;
  304.         info.cachespace = malloc(info.cachesize);
  305.         if (!info.cachespace) {
  306.             fprintf(stderr, "cannot allocate memory for the cache\n");
  307.             return(-PVFS_ENOMEM);
  308.         }
  309.         ret = trove_open_context(flow_d->dest.u.trove.coll_id, &global_trove_context);
  310.         if (ret < 0)
  311.         {
  312.             fprintf(stderr, "TROVE_open_context() failure.\n");
  313.             return (-1);
  314.         }
  315.  
  316.         ret = cache_init(&info);
  317.         if ( ret < 0 )
  318.         {
  319.             fprintf(stderr, "init cache error\n");
  320.             return ret;
  321.         }
  322.  
  323.         cache_initialized = 1;
  324.         fprintf(stderr, "cache is initialized\n");
  325.     }
  326.  
  327.     flow_data = (struct fp_private_data*)malloc(sizeof(struct fp_private_data));
  328.     if(!flow_data)
  329.         return(-PVFS_ENOMEM);
  330.     memset(flow_data, 0, sizeof(struct fp_private_data));
  331.  
  332.     flow_d->flow_protocol_data = flow_data;
  333.     flow_d->state = FLOW_TRANSMITTING;
  334.  
  335.     /* protocol specific fields */
  336.     flow_data->parent = flow_d;
  337.     INIT_QLIST_HEAD(&flow_data->pint_req_list);
  338.     flow_data->pint_request_done = 0;
  339.     INIT_QLIST_HEAD(&flow_data->cache_req_done_list);
  340.  
  341.     flow_data->trove_context = global_trove_context;
  342.     gen_mutex_init(&flow_data->flow_mutex);
  343.  
  344.     /* if a file datatype offset was specified, go ahead and skip ahead 
  345.       * before doing anything else
  346.      */
  347.     if(flow_d->file_req_offset)
  348.         PINT_REQUEST_STATE_SET_TARGET(flow_d->file_req_state, 
  349.                     flow_d->file_req_offset);
  350.  
  351.     /* set boundaries on file datatype */
  352.     if(flow_d->aggregate_size > -1)
  353.     {
  354.         PINT_REQUEST_STATE_SET_FINAL(flow_d->file_req_state, 
  355.             flow_d->aggregate_size+flow_d->file_req_offset);
  356.     }
  357.     else
  358.     {
  359.         PINT_REQUEST_STATE_SET_FINAL(flow_d->file_req_state,
  360.                     flow_d->file_req_offset +
  361.             PINT_REQUEST_TOTAL_BYTES(flow_d->mem_req));
  362.     }
  363.  
  364.     /* (1) init requests; (2) check progress; later all progress checks 
  365.      * are driven by callbacks.
  366.      */
  367.  
  368.     if( flow_d->src.endpoint_id == TROVE_ENDPOINT &&
  369.         flow_d->dest.endpoint_id == BMI_ENDPOINT )
  370.     {
  371.         /* CACHE --> BMI flow: read from cache, then send to the 
  372.          * network. When the cache supports callback, the cache 
  373.          * callback triggers BMI operations; the BMI callback triggers 
  374.          * the release of cache resources. 
  375.          * When the cache does not support callback, as in the current
  376.          * implementation, we use the following progress method:
  377.          * (1) wait for the completion of the first cache request;
  378.          * (2) initiate the BMI send operation;
  379.          * (3) in the BMI send callback function, we wait for the 
  380.          *     completion of the next cache request;  
  381.          * (4) goto step (2).
  382.          */ 
  383.  
  384.         ret = bmi_cache_request_init(flow_data, CACHE_TO_BMI);
  385.         if ( ret < 0 ) {
  386.             PVFS_perror_gossip("bmi_cache_request_init error: "
  387.                                            "error_code", ret);
  388.             return ret;
  389.         }
  390.  
  391.         if ( ret == 1 ) { /* immediate completion */
  392.             return ret;
  393.         }
  394.         
  395.         /* check progress. MUTEX_FLAG is needed because possible
  396.          * callbacks may happen at the same time.
  397.          */
  398.         ret = bmi_cache_progress_check( flow_d, 
  399.                         BLOCKING_FLAG,
  400.                         MUTEX_FLAG,
  401.                         CACHE_TO_BMI );
  402.         if ( ret < 0 )
  403.         {
  404.             PVFS_perror_gossip("bmi_cache_progress_check error: "
  405.                                            "error_code", ret);
  406.             return ret;
  407.         }
  408.  
  409.     }
  410.     else if( flow_d->src.endpoint_id == BMI_ENDPOINT &&
  411.              flow_d->dest.endpoint_id == TROVE_ENDPOINT )
  412.     {
  413.         /* BMI--->CACHE flow: (1) init requests; (2) check progress;
  414.          * later all progress checks are driven by callbacks;
  415.          *   cache callbacks to indicate that data buffers are 
  416.          *   available;
  417.          *    then trigger BMI receive operations to receive data 
  418.          *     into the cache buffers;
  419.          *    the BMI call back function will trigger releasing 
  420.          *    cache buffers.
  421.          * In the current implementation, cache does not provide 
  422.          * callback (we will add it later), the progress of cache 
  423.          * is checked in the BMI call back function. That means, in 
  424.          * the BMI call back function, we release cache buffers for 
  425.          * the previous queue item; then we see if the cache buffers 
  426.          * are available for another queue item (polling with time 
  427.          * idle), if yes, initiate BMI receive operations.
  428.          */ 
  429.  
  430.         ret = bmi_cache_request_init(flow_data, BMI_TO_CACHE);
  431.         if ( ret < 0 ) {
  432.             PVFS_perror_gossip("bmi_cache_request_init error: "
  433.                                            "error_code", ret);
  434.             return ret;
  435.         }
  436.  
  437.         if ( ret == 1 ) { /* immediate completion */
  438.             return ret;
  439.         }
  440.  
  441.         /* check progress. MUTEX_FLAG is needed because possible
  442.          * callbacks may happen at the same time.
  443.          */
  444.  
  445.         ret = bmi_cache_progress_check(flow_d, 
  446.                     BLOCKING_FLAG, 
  447.                     MUTEX_FLAG,
  448.                     BMI_TO_CACHE);
  449.         if ( ret < 0 )
  450.         {
  451.             PVFS_perror_gossip("bmi_cache_progress_check error: "
  452.                                            "error_code", ret);
  453.             return ret;
  454.         }
  455.  
  456.     }
  457.     else
  458.     {
  459.         return(-ENOSYS);
  460.     }
  461.  
  462.     return (0);
  463. }
  464.  
  465.  
  466. /* bmi_cache_request_init(): get request information from "request component",
  467.  * and initiate the related cache requests
  468.  * The basic idea in bmi_cache_request_init() is to chop a big request into 
  469.  * several smaller requests (we called fp_queue_item) and link these small 
  470.  * requests together. The direct purposes is to enable pipelining and to 
  471.  * reduce buffer preasure.
  472.  * return: 0: success;  <0 error; 1: complete;
  473.  */ 
  474.  
  475. int  bmi_cache_request_init(struct fp_private_data *flow_data, int direction)
  476. {
  477.     struct flow_descriptor *flow_d = flow_data->parent;
  478.     struct fp_queue_item *new_qitem = NULL;
  479.     struct pint_req_entry* pint_req = NULL;
  480.     PVFS_size bytes_processed = 0;
  481.  
  482.     int ret;
  483.  
  484.     fprintf(stderr, "bmi_cache_request_init: enter\n");
  485.  
  486.     /* Handle a zero request. TODO: check whether this is right or not. */
  487.     if ( flow_d->file_data.fsize == 0 ) 
  488.     {
  489.         flow_d->state = FLOW_COMPLETE;
  490.         free(flow_data);
  491.         flow_d->release(flow_d);
  492.         flow_d->callback(flow_d, 0);
  493.         fprintf(stderr, "bmi_cache_request_init: exit with return 1. zero request.\n");
  494.         return(1);
  495.     }
  496.  
  497.     /* get request information before launching cache request.
  498.      * Basically, we need to know what the request is, including
  499.      * file regions, each region offset, and each region length. 
  500.      * A pipelining idea is included in the following steps:
  501.      *    a large flow request ---> several requests with "MAX_REGIONS"
  502.      *       and "BUFFER_SIZE" ---> pint_req_list 
  503.      * Later, we process each element in the pint_req_list:
  504.      *  1) dequeue an element from the list;
  505.      *    2) init cache request;
  506.      *  3) drive BMI operations;
  507.      *  4) enqueue the element into another list;
  508.      * If all elements have been through the above steps,
  509.      * the request is done.
  510.      */
  511.  
  512.     assert ( flow_data->pint_request_done == 0 );
  513.  
  514.     /* get PINT_requests and initiate related cache requests */
  515.     do {
  516.         new_qitem = (struct fp_queue_item *)malloc(sizeof(struct fp_queue_item));
  517.         assert(new_qitem);
  518.         memset(new_qitem, 0 , sizeof(struct fp_queue_item));
  519.  
  520.         new_qitem->parent = flow_d;
  521.         
  522.         /* process request */
  523.         pint_req = & new_qitem->pint_req;
  524.         pint_req->result.offset_array = pint_req->offset_list;
  525.         pint_req->result.size_array = pint_req->size_list;
  526.         pint_req->result.bytemax = BUFFER_SIZE;
  527.         pint_req->result.bytes = 0;
  528.         pint_req->result.segmax = MAX_REGIONS;
  529.         pint_req->result.segs = 0;
  530.  
  531.         ret = PINT_process_request( flow_d->file_req_state,
  532.                     flow_d->mem_req_state,
  533.                     &flow_d->file_data,
  534.                     &pint_req->result,
  535.                     PINT_SERVER );
  536.         /* TODO: error handling */
  537.         assert(ret >= 0);
  538.  
  539.         /* submit the cache request */
  540.         ret = bmi_cache_init_cache_req(new_qitem, direction);
  541.  
  542.         /* TODO: error handling */
  543.         assert(ret >= 0);
  544.  
  545.         new_qitem->int_state = INT_REQ_PROCESSING;
  546.  
  547.         /* immediate completion on the cache request */
  548.         if ( ret == 1 ) 
  549.         {
  550.             fprintf(stderr, "bmi_cache_init_cache_req: immediate completion\n");
  551.             new_qitem->int_state = INT_REQ_COMPLETE;
  552.         }
  553.         
  554.         /* put the request into the chain */
  555.         qlist_add_tail(&new_qitem->list_link, &flow_data->pint_req_list);
  556.         bytes_processed += pint_req->result.bytes;
  557.  
  558.        } while( !PINT_REQUEST_DONE(flow_d->file_req_state) );
  559.  
  560.     
  561.     flow_data->pint_request_done = 1;
  562.  
  563.     flow_data->total_bytes_processed = bytes_processed;
  564.  
  565.     fprintf(stderr, "bmi_cache_request_init: exit with return 0 (bytes_processed=%lld)\n", lld(bytes_processed));
  566.  
  567.     return 0;
  568.  
  569. } /* end of bmi_cache_request_init() */
  570.  
  571.  
  572. /* bmi_cache_progress_check(): 
  573.  *     check progress on both cache requests and bmi requests.
  574.  *    mutex_flag:    
  575.  *     (1) mutex_flag is on.
  576.  *        -- this is called in the first time for the flow progress.
  577.  *     (2) mutex_flag is off
  578.  *        -- this is called in other callbacks which hold mutex.
  579.  *    wait_flag: 
  580.      (1) non-blocking; (2) blocking    
  581.  *    direction:
  582.  *     (1) BMI_TO_CACHE; (2) CACHE_TO_BMI
  583.  *
  584.  * return values:
  585.  *      < 0: error
  586.  */
  587.  
  588. int bmi_cache_progress_check(struct flow_descriptor *flow_d, 
  589.             int wait_flag, 
  590.             int mutex_flag,
  591.             int direction)
  592. {
  593.     struct fp_private_data *flow_data = PRIVATE_FLOW(flow_d);
  594.     struct fp_queue_item *q_item = NULL;
  595.     struct qlist_head *tmp_link = NULL;
  596.     int doneflag = 0;
  597.     int ret;
  598.  
  599.     assert ( flow_data->pint_request_done == 1 );
  600.  
  601.     if ( qlist_empty(&flow_data->pint_req_list) ) 
  602.         return 1; 
  603.  
  604.     fprintf(stderr, "bmi_cache_progress_check: enter (direction:%d)\n", direction);
  605.     doneflag = 0;
  606.  
  607.     if ( mutex_flag ) {
  608.         gen_mutex_lock(&flow_data->flow_mutex);
  609.     }
  610.  
  611. check_again:
  612.  
  613.     qlist_for_each(tmp_link, &flow_data->pint_req_list)
  614.     {
  615.         q_item = qlist_entry(tmp_link, struct fp_queue_item, list_link);
  616.  
  617.         if ( q_item->int_state == INT_REQ_INIT )
  618.         {
  619.             /* wrong state */
  620.             PVFS_perror_gossip("bmi_cache_progress_check error: "
  621.                                            "wrong internal state:error code", -1);
  622.  
  623.             if ( mutex_flag )
  624.                 gen_mutex_unlock(&flow_data->flow_mutex);
  625.  
  626.             /* TODO: error code */
  627.             return -1;
  628.         }
  629.  
  630.         /* internal reuqest has been issued, but in processing */
  631.         if ( q_item->int_state == INT_REQ_PROCESSING ) 
  632.         {
  633.             /* check the cache request */
  634.             ret = bmi_cache_check_cache_req(q_item); 
  635.  
  636.             /* TODO: error handling */
  637.             if ( ret < 0 ) 
  638.             {
  639.                 if ( mutex_flag ) 
  640.                     gen_mutex_unlock(&flow_data->flow_mutex);
  641.                 PVFS_perror_gossip("bmi_cache_progress_check error: check cache request error:error code", -1);
  642.  
  643.                 return -1;
  644.             }
  645.     
  646.             if ( ret == 1 ) { 
  647.                 q_item->int_state = INT_REQ_COMPLETE;
  648.             }
  649.         }
  650.  
  651.         /* internal reuqest has been finished, buffers are available 
  652.          * for BMI operations.
  653.          */
  654.         if ( q_item->int_state == INT_REQ_COMPLETE )
  655.         {
  656.             fprintf(stderr, "bmi_cache_progress_check: cache req done\n");
  657.  
  658.             /* move the item from pint_req_list to cache_req_done_list */
  659.             qlist_del(&q_item->list_link);
  660.             qlist_add_tail(&q_item->list_link, &flow_data->cache_req_done_list);
  661.  
  662.             if ( direction == BMI_TO_CACHE ) 
  663.             {
  664.                 if ( mutex_flag )
  665.                     gen_mutex_unlock(&flow_data->flow_mutex);
  666.                 cache_write_callback_fn(q_item, 0);
  667.  
  668.                 if ( mutex_flag )
  669.                     gen_mutex_lock(&flow_data->flow_mutex);
  670.             } else
  671.             {
  672.                 if ( mutex_flag )
  673.                     gen_mutex_unlock(&flow_data->flow_mutex);
  674.                 cache_read_callback_fn(q_item, 0);
  675.  
  676.                 if ( mutex_flag )
  677.                     gen_mutex_lock(&flow_data->flow_mutex);
  678.             }
  679.             doneflag = 1;
  680.         }
  681.         
  682.         if ( !q_item->callback_setup ) 
  683.         {
  684.             q_item->bmi_callback.fn = bmi_recv_callback_fn;
  685.             q_item->bmi_callback.data = q_item;
  686. #ifdef __CACHE_CALLBACK_SUPPORT__
  687.             q_item->cache_callback.fn = cache_write_callback_fn;
  688.             q_item->cache_callback.data = q_item;
  689. #else
  690.             q_item->cache_callback.fn = NULL;
  691.             q_item->cache_callback.data = NULL;
  692. #endif
  693.  
  694.             q_item->callback_setup = 1;
  695.         }
  696.         
  697.         if ( doneflag ) break;
  698.         
  699.     } /* qlist_for_each element request */
  700.     
  701.     if ( !doneflag && wait_flag == BLOCKING_FLAG )
  702.         goto check_again;
  703.  
  704.     if ( mutex_flag )
  705.         gen_mutex_unlock(&flow_data->flow_mutex);
  706.     
  707.     return 0;
  708. }
  709.  
  710.  
  711. /* bmi_recv_callback_fn()
  712.  *
  713.  * function to be called upon completion of a BMI recv operation
  714.  * Two main jobs:
  715.  *   (1) release cache buffers
  716.  *   (2) if the cache does not support callback, check progress in the cache
  717.  *       side.
  718.  * 
  719.  * no return value
  720.  */
  721. static void bmi_recv_callback_fn(void *user_ptr,
  722.                  PVFS_size actual_size,
  723.                  PVFS_error error_code)
  724. {
  725.     struct fp_queue_item* q_item = user_ptr;
  726.     struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
  727.     struct flow_descriptor * flow_d = flow_data->parent;
  728.  
  729.     struct qlist_head *tmp_link = NULL;
  730.  
  731.     int ret;
  732.  
  733.     /* TODO: error handling */
  734.     if(error_code != 0)
  735.     {
  736.         PVFS_perror_gossip("bmi_recv_callback_fn error_code", 
  737.                     error_code);
  738.         assert(0);
  739.     }
  740.  
  741.     gen_mutex_lock(&flow_data->flow_mutex);
  742.  
  743.     /* remove from current queue */
  744.     qlist_del(&q_item->list_link);
  745.  
  746.     /* WRONG: Add to another list for resource reclaim */
  747.  
  748.     flow_d->total_transferred += actual_size;
  749.  
  750.  
  751.     /* release cache resource, since data has been received into 
  752.      * the cache buffers 
  753.      */
  754.  
  755.     ret = bmi_cache_release_cache_src(q_item);
  756.  
  757.     /* TODO: error handling */
  758.     if ( ret < 0 ) {
  759.         PVFS_perror_gossip("bmi_recv_callback_fn: "
  760.                                    "error from  bmi_cache_release_cache_src:error_code", ret);
  761.         gen_mutex_unlock(&flow_data->flow_mutex);
  762.         return;
  763.     }
  764.  
  765.     /* when no callback support from the cache, we take advantage of 
  766.      * the BMI callback to make progress. That is, in the BMI callback 
  767.      * function, we wait for the completion of another request from 
  768.      * the cache component, then initiate BMI recv function. All these 
  769.      * are done in "bmi_cache_progress_check()" with BLOCKING_FLAG.
  770.      * Be careful of "mutex" stuff. 
  771.      */
  772.  
  773.     /* no callback support from the cache */
  774.     if ( !q_item->cache_callback.fn )  
  775.     {
  776.         ret = bmi_cache_progress_check( flow_d, 
  777.                         BLOCKING_FLAG, 
  778.                         NO_MUTEX_FLAG,
  779.                         BMI_TO_CACHE);
  780.         if ( ret < 0 )
  781.         {
  782.             PVFS_perror_gossip("bmi_recv_callback_fn: "
  783.                                            "error from bmi_cache_progress_check:error_code", ret);
  784.             gen_mutex_unlock(&flow_data->flow_mutex);
  785.             return;
  786.         }
  787.         /* the whole flow request is finished */
  788.         if ( ret == 1 ){
  789.             gen_mutex_unlock(&flow_data->flow_mutex);
  790.  
  791.             /* free fp_queue_item */
  792.             qlist_for_each(tmp_link, &flow_data->cache_req_done_list) 
  793.             {
  794.                 q_item = qlist_entry(tmp_link, struct fp_queue_item, list_link);
  795.                 qlist_del(&q_item->list_link);
  796.                 free(q_item);
  797.                 fprintf(stderr, "bmi_recv_callback_fn: free fp_queue_item\n");
  798.                 
  799.             }
  800.             free(flow_data);
  801.             flow_d->state = FLOW_COMPLETE;
  802.             flow_d->release(flow_d);
  803.             flow_d->callback(flow_d, 0);
  804.             fprintf(stderr, "bmi_recv_callback_fn: request is done\n");
  805.             return;
  806.         }
  807.     }
  808.  
  809.     gen_mutex_unlock(&flow_data->flow_mutex);
  810.     return;
  811. }
  812.     
  813. /* cache_write_callback_fn()
  814.  *
  815.  * function to be called upon completion of a cache write operation. 
  816.  * The completion means that the cache component provides needed 
  817.  * buffers to flow. In this function, we initiate BMI receive requests 
  818.  * to transfer data into the cache buffers. 
  819.  *
  820.  * no return value
  821.  */
  822. static void cache_write_callback_fn(void *user_ptr,
  823.                    PVFS_error error_code)
  824. {
  825.     PVFS_size tmp_actual_size;
  826.     PVFS_size total_size;
  827.     int i;
  828.  
  829. #ifdef __CACHE_CALLBACK_SUPPORT__
  830.     struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
  831. #endif
  832.  
  833.     struct fp_queue_item* q_item = user_ptr;
  834.     int ret;
  835.     PVFS_id_gen_t bmi_reqid;
  836.  
  837.     /* TODO: error handling */
  838.     assert(error_code == 0);
  839.     
  840.     fprintf(stderr, "cache_write_callback_fn: enter\n");
  841.  
  842.     /* if cache supports callback, this function will called
  843.      * independently. Otherwise, it is called inside of mutex.
  844.      * Thus, there is no need holding mutex.
  845.      */ 
  846. #ifdef __CACHE_CALLBACK_SUPPORT__
  847.     gen_mutex_lock(&flow_data->flow_mutex);
  848. #endif
  849.  
  850.     q_item->int_state = INT_REQ_COMPLETE;
  851.  
  852.     /* TODO: cache_req.total_size in the buffer code */
  853.     total_size = 0;
  854.     for ( i=0;  i<q_item->cache_req.mem_cnt; i++ ) {
  855.         total_size += q_item->cache_req.msize_list[i];
  856.         fprintf(stderr, "cache_write_callback_fn:recv buff [%d] len:%lld\n", i, lld(q_item->cache_req.msize_list[i]));
  857.         q_item->cache_req.total_size = total_size;
  858.     }
  859.     fprintf(stderr, "cache_write_callback_fn:to recv %lld\n", lld(q_item->cache_req.total_size));
  860.  
  861.     /* TODO: what if we recv less than expected? */
  862.     ret = BMI_post_recv_list(&bmi_reqid,
  863.         q_item->parent->src.u.bmi.address,
  864.         (void **)q_item->cache_req.moff_list,
  865.         q_item->cache_req.msize_list,
  866.         q_item->cache_req.mem_cnt,
  867.         q_item->cache_req.total_size,
  868.             &tmp_actual_size,
  869.         q_item->cache_req.buffer_type,
  870.             q_item->parent->tag,
  871.             &q_item->bmi_callback,
  872.             global_bmi_context,
  873.         q_item->parent->hints);
  874.  
  875.     /* TODO: error handling */
  876.     assert(ret >= 0);
  877.  
  878.     if(ret == 1)
  879.     {
  880. #ifdef __CACHE_CALLBACK_SUPPORT__
  881.         gen_mutex_unlock(&flow_data->flow_mutex);
  882. #endif
  883.         /* immediate completion; trigger callback ourselves */
  884.         bmi_recv_callback_fn(q_item, tmp_actual_size, 0);
  885.  
  886. #ifdef __CACHE_CALLBACK_SUPPORT__
  887.         gen_mutex_lock(&flow_data->flow_mutex);
  888. #endif
  889.     }
  890.  
  891. #ifdef __CACHE_CALLBACK_SUPPORT__
  892.     gen_mutex_unlock(&flow_data->flow_mutex);
  893. #endif
  894.  
  895.     return;
  896. };
  897.  
  898.  
  899.  
  900. /* The following two functions are for flow from the CACHE to BMI 
  901.  * direction: cache_read_callback_fn() and bmi_send_callback_fn().
  902.  */
  903.  
  904. /* cache_read_callback_fn()
  905.  *
  906.  * function to be called upon completion of a cache read operation. 
  907.  * The completion means that the cache component provides needed 
  908.  * buffers with data to flow. In this function, we initiate BMI 
  909.  * send requests to send data out to the network.
  910.  *
  911.  * no return value
  912.  */
  913. static void cache_read_callback_fn(void *user_ptr,
  914.                    PVFS_error error_code)
  915. {
  916.     PVFS_size sent_size;
  917.     struct fp_queue_item* q_item = user_ptr;
  918.  
  919. #ifdef __CACHE_CALLBACK_SUPPORT
  920.     struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
  921. #endif
  922.  
  923.     PVFS_id_gen_t bmi_reqid;
  924.     int ret;
  925.  
  926.     /* TODO: error handling */
  927.     assert(error_code == 0);
  928.  
  929. #ifdef __CACHE_CALLBACK_SUPPORT__
  930.     gen_mutex_lock(&flow_data->flow_mutex);
  931. #endif
  932.  
  933.     q_item->int_state = INT_REQ_COMPLETE;
  934.  
  935.     ret = BMI_post_send_list(&bmi_reqid,
  936.         q_item->parent->dest.u.bmi.address,
  937.         (const void **)q_item->cache_req.moff_list,
  938.         q_item->cache_req.msize_list,
  939.         q_item->cache_req.mem_cnt,
  940.         q_item->cache_req.total_size,
  941.         q_item->cache_req.buffer_type,
  942.         q_item->parent->tag,
  943.         &q_item->bmi_callback,
  944.         global_bmi_context,
  945.         (bmi_hint)q_item->parent->hints);
  946.         
  947.     /* TODO: error handling */
  948.     assert(ret >= 0);
  949.  
  950.     if(ret == 1)
  951.     {
  952. #ifdef __CACHE_CALLBACK_SUPPORT__
  953.         gen_mutex_unlock(&flow_data->flow_mutex);
  954. #endif
  955.  
  956.         /* immediate completion; trigger callback ourselves */
  957.         sent_size = q_item->cache_req.total_size;
  958.         bmi_send_callback_fn(q_item, sent_size, 0);
  959.  
  960. #ifdef __CACHE_CALLBACK_SUPPORT__
  961.         gen_mutex_lock(&flow_data->flow_mutex);
  962. #endif
  963.     }
  964.  
  965. #ifdef __CACHE_CALLBACK_SUPPORT__
  966.     gen_mutex_unlock(&flow_data->flow_mutex);
  967. #endif
  968.  
  969.     return;
  970. };
  971.  
  972. /* bmi_send_callback_fn()
  973.  *
  974.  * function to be called upon completion of a BMI send operation
  975.  * Two main jobs:
  976.  *   (1) release cache buffers
  977.  *   (2) if the cache does not support callback, check progress on 
  978.  *       the cache side.
  979.  * 
  980.  * no return value
  981.  */
  982. static void bmi_send_callback_fn(void *user_ptr,
  983.                  PVFS_size actual_size,
  984.                  PVFS_error error_code)
  985. {
  986.     struct fp_queue_item* q_item = user_ptr;
  987.     struct fp_private_data* flow_data = PRIVATE_FLOW(q_item->parent);
  988.     struct flow_descriptor * flow_d = flow_data->parent;
  989.     int ret;
  990.  
  991.     /* TODO: error handling */
  992.     if(error_code != 0)
  993.     {
  994.         PVFS_perror_gossip("bmi_send_callback_fn: error", error_code);
  995.         assert(0);
  996.     }
  997.  
  998.     gen_mutex_lock(&flow_data->flow_mutex);
  999.  
  1000.  
  1001.     flow_d->total_transferred += actual_size;
  1002.  
  1003.     /* release cache resource */
  1004.  
  1005.     ret = bmi_cache_release_cache_src(q_item);
  1006.  
  1007.     /* TODO: error handling */
  1008.     if ( ret < 0 ) {
  1009.         PVFS_perror_gossip("bmi_send_callback_fn: "
  1010.                                    "from bmi_cache_release_cache_src:error_code", ret);
  1011.         gen_mutex_unlock(&flow_data->flow_mutex);
  1012.         return;
  1013.     }
  1014.  
  1015.     /* remove from current queue. q_item must be in the 
  1016.      * cache_req_done_list when coming here.  */
  1017.  
  1018.     qlist_del(&q_item->list_link);
  1019.  
  1020.     /* when no callback support from the cache, we take advantage of 
  1021.      * the BMI callback to make progress. That is, in the BMI callback 
  1022.      * function, we wait for the completion of another request from 
  1023.      * the cache component, then initiate BMI send function. All these 
  1024.      * are done in "bmi_cache_progress_check()" with BLOCKING_FLAG.
  1025.      */
  1026.  
  1027.     /* no callback support from the cache. */
  1028.     if ( !q_item->cache_callback.fn )  
  1029.     {
  1030.         ret = bmi_cache_progress_check( flow_d, 
  1031.                         BLOCKING_FLAG, 
  1032.                         NO_MUTEX_FLAG,
  1033.                         CACHE_TO_BMI );
  1034.         if ( ret < 0 )
  1035.         {
  1036.             PVFS_perror_gossip("bmi_send_callback_fn: "
  1037.                                            "from bmi_cache_progress_check:error_code", ret);
  1038.             gen_mutex_unlock(&flow_data->flow_mutex);
  1039.             return;
  1040.         }
  1041.     }
  1042.  
  1043.     gen_mutex_unlock(&flow_data->flow_mutex);
  1044.  
  1045.     free(q_item);
  1046.  
  1047.     return;
  1048. }
  1049.  
  1050. static int bmi_cache_check_cache_req(struct fp_queue_item *qitem) 
  1051. {
  1052.     cache_reply_t reply;
  1053.     int flag = 0;
  1054.     int ret = -1;
  1055.         
  1056.     fprintf(stderr, "bmi_cache_check_cache_req:enter (req:%d\n", qitem->cache_req.request.internal_id);
  1057.  
  1058.     ret = cache_req_test( &(qitem->cache_req.request), &flag, &reply, NULL);
  1059.     if ( ret < 0 ) 
  1060.     {
  1061.         PVFS_perror_gossip("bmi_cache_check_cache_req: "
  1062.                         "error_code", ret);
  1063.         return ret;
  1064.     }
  1065.  
  1066.     /* a request is finished. */
  1067.     if ( flag )
  1068.     {
  1069.         qitem->cache_req.mem_cnt = reply.count;
  1070.         qitem->cache_req.moff_list = (PVFS_offset **)reply.cbuf_offset_array;
  1071.         qitem->cache_req.msize_list = reply.cbuf_size_array;
  1072.         qitem->cache_req.errval = reply.errval;
  1073.  
  1074.         fprintf(stderr, "bmi_cache_check_cache_req:exit req done count %d\n", reply.count);
  1075.         return 1;
  1076.     }
  1077.     fprintf(stderr, "bmi_cache_check_cache_req:exit req no done\n");
  1078.  
  1079.     return 0;
  1080. }
  1081.  
  1082. static int bmi_cache_release_cache_src(struct fp_queue_item *qitem)
  1083. {
  1084.     int ret;
  1085.     
  1086.     ret = cache_req_done( &(qitem->cache_req.request) );    
  1087.     if ( ret < 0 ) 
  1088.     {
  1089.         PVFS_perror_gossip("bmi_cache_release_cache_src: "
  1090.                         "error_code", ret);
  1091.         return ret;
  1092.     }
  1093.     
  1094.     return 0;
  1095. }
  1096.  
  1097. static int bmi_cache_init_cache_req(struct fp_queue_item *qitem, int op )
  1098. {
  1099.     int ret = 0;
  1100.     cache_read_desc_t desc1;
  1101.     cache_write_desc_t desc2;
  1102.     cache_reply_t reply;
  1103.  
  1104.     if ( op == BMI_TO_CACHE )  /* write */
  1105.     {
  1106.         desc2.coll_id = qitem->parent->dest.u.trove.coll_id;
  1107.         desc2.handle = qitem->parent->dest.u.trove.handle;
  1108.         desc2.context_id = global_trove_context;
  1109.  
  1110.         /* TODO: if we use intemediate buffer, change here */
  1111.         desc2.buffer = NULL;
  1112.         desc2.len = 0;
  1113.         
  1114.         desc2.stream_array_count  = qitem->pint_req.result.segs; 
  1115.         desc2.stream_offset_array = qitem->pint_req.result.offset_array;
  1116.         desc2.stream_size_array = qitem->pint_req.result.size_array;
  1117.  
  1118.         ret = cache_write_post( &desc2, 
  1119.                     &qitem->cache_req.request, 
  1120.                     &reply,
  1121.                     NULL );
  1122.         if ( ret < 0 ) 
  1123.         {
  1124.             PVFS_perror_gossip("bmi_cache_init_cache_req: "
  1125.                                      "error_code", ret);
  1126.         }
  1127.  
  1128.     }
  1129.     else /* read */
  1130.     {
  1131.         desc1.coll_id = qitem->parent->dest.u.trove.coll_id;
  1132.         desc1.handle = qitem->parent->dest.u.trove.handle;
  1133.         desc1.context_id = global_trove_context;
  1134.  
  1135.         /* TODO: if we use intemediate buffer, change here */
  1136.         desc1.buffer = NULL;
  1137.         desc1.len = 0;
  1138.         
  1139.         desc1.stream_array_count  = qitem->pint_req.result.segs; 
  1140.         desc1.stream_offset_array = qitem->pint_req.result.offset_array;
  1141.         desc1.stream_size_array = qitem->pint_req.result.size_array;
  1142.  
  1143.         ret = cache_read_post( &desc1, 
  1144.                     &qitem->cache_req.request, 
  1145.                     &reply,
  1146.                     NULL );
  1147.         if ( ret < 0 ) 
  1148.         {
  1149.             PVFS_perror_gossip("bmi_cache_init_cache_req: "
  1150.                                      "error_code", ret);
  1151.         }
  1152.     }
  1153.  
  1154.     /* immediate completion */
  1155.     if ( ret == 1 ) 
  1156.     {
  1157.         qitem->cache_req.mem_cnt = reply.count;
  1158.         qitem->cache_req.moff_list = (PVFS_offset **)reply.cbuf_offset_array;
  1159.         qitem->cache_req.msize_list = reply.cbuf_size_array;
  1160.         qitem->cache_req.errval = reply.errval;
  1161.  
  1162.         fprintf(stderr, "bmi_cache_init_cache_req: immediate completion mcnt:%d\n", reply.count);
  1163.     }
  1164.  
  1165.     return ret;
  1166. }
  1167.