home *** CD-ROM | disk | FTP | other *** search
/ ftp.parl.clemson.edu / 2015-02-07.ftp.parl.clemson.edu.tar / ftp.parl.clemson.edu / pub / pvfs2 / orangefs-2.8.3-20110323.tar.gz / orangefs-2.8.3-20110323.tar / orangefs / src / io / buffer / internal.c < prev    next >
C/C++ Source or Header  |  2006-05-27  |  26KB  |  910 lines

  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <errno.h>
  4. #include <string.h>
  5.  
  6. #include "internal.h"
  7. #include "state.h"
  8. #include "flags.h"
  9. #include "aiovec.h"
  10. #include "cache.h"
  11. #include "ncac-job.h"
  12. #include "pvfs2-internal.h"
  13.  
  14. static void NCAC_list_add_tail_lock(struct list_head *new, struct list_head *head, NCAC_lock *lock);
  15. static void NCAC_list_del_lock(struct list_head *entry, NCAC_lock *lock);
  16. static void NCAC_read_request_from_list_lock(struct list_head *head, NCAC_lock *lock, NCAC_req_t ** ncac_req_ptr);
  17.  
  18. /* This file contains NCAC internal functions. */
  19.  
  20. static inline struct inode *get_inode( PVFS_fs_id, PVFS_handle , PVFS_context_id);
  21. static inline int NCAC_rwjob_prepare_single(NCAC_req_t *ncac_req);
  22. static inline int NCAC_rwjob_prepare_list(NCAC_req_t *ncac_req);
  23.  
  24. /* get_internal_req(): get a internal request structure from the free
  25.  * list. To avoid dynamic allocation, for the timebeing, I hard code
  26.  * the total number of outstanding requests. In the future, a dynamic
  27.  * one could be taken. That is, if all static ones have been used, 
  28.  * extra requests can be allocated on demand.
  29.  *
  30.  */
  31.  
  32. static inline struct NCAC_req * get_internal_req_lock( PVFS_fs_id fsid, PVFS_handle hndl)
  33. {
  34.  
  35.     NCAC_req_t *req=NULL;
  36.     struct list_head *new;
  37.  
  38.     list_lock(&NCAC_dev.req_list_lock); 
  39.  
  40.     if ( list_empty(&NCAC_dev.free_req_list) ) return NULL;
  41.  
  42.     new = NCAC_dev.free_req_list.next;
  43.     if ( !new ) {
  44.         fprintf(stderr, "there is no entry in the free req list\n");
  45.         return NULL;
  46.     }
  47.     list_del_init(new);
  48.  
  49.     list_unlock(&NCAC_dev.req_list_lock); 
  50.  
  51.     req = list_entry(new->prev, NCAC_req_t, list);
  52.  
  53.     return req;  
  54. }
  55.  
  56.  
  57. /* add a request into the tail of a list exclusively. */
  58. static void NCAC_list_add_tail_lock(struct list_head *new, struct list_head *head, NCAC_lock *lock)
  59. {
  60.     list_lock(lock);
  61.  
  62.     list_add_tail(new, head);
  63.  
  64.     list_unlock(lock);
  65.  
  66. }
  67.  
  68. /* delete an entry from its list */
  69. static void NCAC_list_del_lock(struct list_head *entry, NCAC_lock *lock)
  70. {
  71.     list_lock(lock);
  72.  
  73.     list_del(entry);
  74.  
  75.     list_unlock(lock);
  76. }
  77.  
  78. /* read an entry without mark from a list and mark the entry. */
  79. static void NCAC_read_request_from_list_lock(struct list_head *head, NCAC_lock *lock, NCAC_req_t ** ncac_req_ptr)
  80. {
  81.     struct list_head *pos;
  82.     NCAC_req_t *req = NULL;
  83.  
  84.     list_lock(lock);
  85.  
  86.     for (pos = head->next; pos != head; pos = pos->next) {
  87.             req = list_entry( pos, NCAC_req_t, list);
  88.         if ( !req->read_out ) {
  89.             req->read_out = 1;
  90.             break;
  91.         }
  92.     }
  93.  
  94.     list_unlock(lock);
  95.  
  96.     *ncac_req_ptr = req;
  97. }
  98.  
  99. /* build internal read/write requests */
  100. NCAC_req_t *NCAC_rwreq_build( NCAC_desc_t *desc, NCAC_optype optype)
  101. {
  102.     void *iovec;
  103.     NCAC_req_t *ncac_req;
  104.     int tmp_off, tmp_size;
  105.  
  106.     ncac_req = get_internal_req_lock(desc->coll_id, desc->handle);
  107.     if (ncac_req == NULL) { /* run out of ncac request resources */
  108.         fprintf(stderr, "no free request\n");
  109.         return NULL;
  110.     }
  111.  
  112.     ncac_req->coll_id         = desc->coll_id;
  113.     ncac_req->handle          = desc->handle;
  114.     ncac_req->context_id      = desc->context_id;
  115.  
  116.     /* inode */
  117.     ncac_req->mapping = get_inode(desc->coll_id, desc->handle, desc->context_id);
  118.     /* inode aiovec */
  119.     ncac_req->aiovec  = &(ncac_req->mapping->aiovec);
  120.     ncac_req->nr_dirty = 0;
  121.  
  122.     if ( desc->buffer ) { /* copy data into the user's buffer */
  123.         if ( optype == NCAC_GEN_READ ) 
  124.             ncac_req->optype = NCAC_BUF_READ;
  125.         else ncac_req->optype = NCAC_BUF_WRITE;
  126.  
  127.         ncac_req->usrbuf  = desc->buffer;
  128.         ncac_req->usrlen  = desc->len;
  129.     }else{ /* use cache buffers for communication */
  130.         if ( optype == NCAC_GEN_READ ) 
  131.             ncac_req->optype = NCAC_READ;
  132.         else ncac_req->optype = NCAC_WRITE;
  133.  
  134.         ncac_req->usrbuf  = NULL;
  135.         ncac_req->usrlen  = 0;
  136.     }
  137.  
  138.     ncac_req->written = 0;  /* size finished */
  139.  
  140.     if ( desc->stream_array_count == 1 ) { /* one segment case */
  141.  
  142.         ncac_req->pos     = desc->stream_offset_array[0];
  143.         ncac_req->size    = desc->stream_size_array[0];
  144.         ncac_req->offcnt  = 0;    /* no vector */
  145.         ncac_req->offvec  = NULL; /* no vector */
  146.         ncac_req->sizevec = NULL; /* no vector */
  147.  
  148.     }else{ /* a list of <off, len> tuples */
  149.         tmp_off = desc->stream_array_count*sizeof(PVFS_offset);
  150.         tmp_size = desc->stream_array_count*sizeof(PVFS_size);
  151.  
  152.         iovec = (void*) malloc( tmp_off + tmp_size );
  153.         if (iovec == NULL ) {
  154.             ncac_req->status = NCAC_NO_MEM;
  155.             return ncac_req;
  156.         }
  157.  
  158.         ncac_req->offcnt = desc->stream_array_count;
  159.         ncac_req->offvec  = (PVFS_offset*)iovec;
  160.         ncac_req->sizevec = (PVFS_size *)( (unsigned long)iovec + tmp_off );
  161.  
  162.         /* copy the user request information into an internal request */
  163.         memcpy(ncac_req->offvec, desc->stream_offset_array, tmp_off);
  164.         memcpy(ncac_req->sizevec, desc->stream_size_array, tmp_size);
  165.     }
  166.  
  167.     /* success */
  168.     ncac_req->status = NCAC_OK;
  169.     return ncac_req;
  170. }
  171.  
  172. /*
  173.  * NCAC_rwjob_prepare(): does three things:
  174.  * (1) allocate resource; caculate index, offset, and length; 
  175.  * (2) put the request in the internal job list
  176.  * (3) make progress of the requests in the job list.
  177.  */
  178. int NCAC_rwjob_prepare(NCAC_req_t *ncac_req, NCAC_reply_t *reply )
  179. {
  180.     int ret;
  181.  
  182.     /* prepare the request */
  183.     if ( !ncac_req->offcnt ) { /* only one contiguous segment */
  184.  
  185.         ret = NCAC_rwjob_prepare_single(ncac_req);
  186.  
  187.     }else{      /* multiple segements */
  188.  
  189.         ret = NCAC_rwjob_prepare_list(ncac_req);
  190.  
  191.     }
  192.     if ( ret < 0 ){
  193.         ncac_req->error = ret;
  194.         return ret;
  195.     }
  196.  
  197.     /* put the request in the internal job list: thread safe */
  198.  
  199.     NCAC_list_add_tail_lock(&ncac_req->list, &NCAC_dev.prepare_list, 
  200.                             &NCAC_dev.req_list_lock);
  201.  
  202.     ncac_req->status = NCAC_REQ_SUBMITTED;
  203.  
  204.     DPRINT("NCAC_rwjob_prepare: %p submitted\n", ncac_req);
  205.  
  206.     /* make progress of jobs: thread safe.
  207.      * Choices here are: 1) do one job; 2) scan the whole list. 
  208.      * Choose 1) here. 
  209.      */
  210.     //ret = NCAC_do_jobs(&(NCAC_dev.req_list), &(NCAC_dev.bufcomp_list), &(NCAC_dev.comp_list), &NCAC_dev.req_list_lock); 
  211.  
  212.     ret = NCAC_do_a_job(ncac_req, &(NCAC_dev.prepare_list), 
  213.                     &(NCAC_dev.bufcomp_list), 
  214.                     &(NCAC_dev.comp_list), 
  215.                     &NCAC_dev.req_list_lock);
  216.  
  217.     if ( ret < 0 ) {
  218.         ncac_req->error = NCAC_JOB_DO_ERR;
  219.         ncac_req->status = NCAC_ERR_STATUS;
  220.         return ret;
  221.     }
  222.  
  223.     ncac_req->error  = NCAC_OK;
  224.  
  225.     return 0;
  226. }
  227.  
  228.  
  229. /* NCAC_rwjob_prepare_single: Given a request which accesses only one
  230.  *      file region, we prepare needed resources for this request:
  231.  *  1) extent cache buffers;
  232.  *  2) Communication buffer address;    
  233.  *  3) Communication buffer sizes;
  234.  *  4) Communication buffer flags;
  235.  *  Given the extent size is 32768 bytes, if a request wants to
  236.  *  read data 32768 bytes from 1024,
  237.  *      (1) two extents: 0-32765, and 32768-65535
  238.  *      (2) comm bufers: extent1.addr+1024, extent2.addr
  239.  *      (3) comm bufer size: 31744, 1024
  240.  *      (4) if data is ready, flag is set.
  241.  *  In this case, the number of extents and the number of communication
  242.  *  buffers are same.
  243.  */
  244.  
  245. static inline int NCAC_rwjob_prepare_single(NCAC_req_t *ncac_req)
  246. {
  247.     int extcnt;  /* cache extent count */
  248.     int comcnt;  /* communication buffer count */
  249.     int allocsize;
  250.  
  251.     PVFS_offset   *foff;
  252.     char          **cbufoff;
  253.     PVFS_size     *cbufsize;
  254.     int           *cbufflag;
  255.     unsigned long firstoff;
  256.     
  257.     int i;
  258.  
  259.     extcnt = (ncac_req->pos + ncac_req->size + NCAC_dev.extsize -1) /
  260.                 NCAC_dev.extsize - ncac_req->pos/NCAC_dev.extsize; 
  261.     comcnt = extcnt;
  262.  
  263.     if ( ncac_req->reserved_cbufcnt < comcnt ) {
  264.         if ( ncac_req->cbufoff ) free( ncac_req->cbufoff);
  265.  
  266.         allocsize = ( sizeof(PVFS_offset) + sizeof(char*) + sizeof(PVFS_size)
  267.             + sizeof(struct extent *) + 3*sizeof(int) ) * comcnt; 
  268.  
  269.         ncac_req->foff  =(PVFS_offset*) malloc(allocsize); 
  270.  
  271.         if ( ncac_req->foff == NULL ) {
  272.             ncac_req->error = -ENOMEM;
  273.             return -ENOMEM;
  274.         }
  275.  
  276.         ncac_req->cbufoff  =(char**) & ncac_req->foff[comcnt];
  277.         ncac_req->cbufsize =(PVFS_size*)  &ncac_req->cbufoff[comcnt];
  278.         ncac_req->cbufhash =(struct extent**)
  279.                             &ncac_req->cbufsize[comcnt];
  280.         ncac_req->cbufflag =(int*) &ncac_req->cbufhash[comcnt];
  281.         ncac_req->cbufrcnt =(int*) &ncac_req->cbufflag[comcnt];
  282.         ncac_req->cbufwcnt =(int*) &ncac_req->cbufrcnt[comcnt];
  283.  
  284.         ncac_req->reserved_cbufcnt = comcnt;
  285.  
  286.         memset(ncac_req->foff, 0, allocsize);
  287.     }
  288.  
  289.     ncac_req->cbufcnt = comcnt;
  290.  
  291.     foff = ncac_req->foff;
  292.     cbufoff = ncac_req->cbufoff;
  293.     cbufsize = ncac_req->cbufsize;
  294.     cbufflag = ncac_req->cbufflag;
  295.  
  296.     /* Setup the related values for foff, cbufoff, and cbufsize */
  297.  
  298.     firstoff = (unsigned long) (ncac_req->pos & (NCAC_dev.extsize -1)); 
  299.     foff[0] = ncac_req->pos - firstoff;
  300.     cbufoff[0] = (char*)firstoff;     /* offsize to the extent address */
  301.     cbufsize[0] = NCAC_dev.extsize - firstoff;
  302.     cbufflag[0] = NCAC_COMM_NOT_READY;
  303.  
  304.     for ( i= 1; i < comcnt; i++){
  305.         foff[i] = foff[i-1] + NCAC_dev.extsize;
  306.         cbufoff[i] = 0;
  307.         cbufsize[i] = NCAC_dev.extsize;
  308.         cbufflag[i] = NCAC_COMM_NOT_READY;
  309.     }
  310.     /* adjust the size of the last buffer in each segment. */
  311.     cbufsize[comcnt-1] = (ncac_req->pos + ncac_req->size)% NCAC_dev.extsize;
  312.  
  313. #if 1
  314.     fprintf(stderr, "[%s] exit %d comm buffers\n", __FUNCTION__, comcnt);
  315.     for (i=0; i<comcnt; i++){
  316.         fprintf(stderr, "fpos:%lld, buf_off:%ld, size:%lld\n", lld(foff[i]),
  317. (unsigned long)cbufoff[i], lld(cbufsize[i]));
  318.     }
  319. #endif
  320.  
  321.     fprintf(stderr, "[%s] exit %d comm buffers\n", __FUNCTION__, comcnt);
  322.     return 0;
  323. }
  324.  
  325. /* NCAC_rwjob_prepare_list: Given a request which accesses a list of
  326.  *      fire regions, we prepare needed resources for this request:
  327.  *  1) extent cache buffers;
  328.  *  2) Communication buffer address;    
  329.  *  3) Communication buffer sizes;
  330.  *  4) Communication buffer flags;
  331.  *  Given the extent size is 32768 bytes, if a request wants to
  332.  *  read data the following regions: (1024, 32768) and (65530, 32768)
  333.  *      (1) Three extents: 0-32765, 32768-65535, 65536-98303
  334.  *      (2) Communication buffers:
  335.  *            extent1.addr+1024, extent2.addr,
  336.  *            extent2.addr+32762, extent3.addr
  337.  *      (3) Communication buffer size:
  338.  *                31744, 1024, 6, and 32762
  339.  *  This example shows that:
  340.  *    (A) For the underlying I/O system, we are going to read
  341.  *        three extents;
  342.  *    (B) For the upper communcation system, we are goint to
  343.  *        user four different buffers.
  344.  *   The number of communication buffers is equal to or larger 
  345.  *   than the number of needed extents.
  346.  */ 
  347.  
  348. struct freg_tuple
  349. {
  350.     PVFS_offset fpos;
  351.     PVFS_size   size;
  352. };
  353.  
  354. static int comp_pos(const void *x1, const void *x2)
  355. {
  356.     const PVFS_offset *num1 = x1;
  357.     const PVFS_offset *num2 = x2;
  358.  
  359.     if (*num1 <  *num2) return -1;
  360.     if (*num1 == *num2) return  0;
  361.     if (*num1 >  *num2) return  1;
  362.     return 0;
  363. }
  364.  
  365. static inline int NCAC_rwjob_prepare_list(NCAC_req_t *ncac_req)
  366. {
  367.     int extcnt;  /* cache extent count */
  368.     int comcnt;  /* communication buffer count */
  369.     int allocsize;
  370.  
  371.     PVFS_offset   *foff;
  372.     char          **cbufoff;
  373.     PVFS_size     *cbufsize;
  374.     int           *cbufflag;
  375.     unsigned long   firstoff;
  376.  
  377.     int i, j;
  378.     int cnt;
  379.  
  380.     struct freg_tuple *fregions;
  381.     
  382.     fregions = (struct freg_tuple *)malloc(ncac_req->offcnt *
  383.                     sizeof(struct freg_tuple));
  384.     if ( NULL == fregions){
  385.         ncac_req->error = -ENOMEM;
  386.         return -ENOMEM;
  387.     }
  388.  
  389.     extcnt = 0;
  390.     for (i = 0; i < ncac_req->offcnt; i ++) {
  391.         extcnt += (ncac_req->offvec[i] + ncac_req->sizevec[i] +
  392.                 NCAC_dev.extsize -1)/NCAC_dev.extsize - 
  393.                 ncac_req->offvec[i]/NCAC_dev.extsize;
  394.  
  395.         fregions[i].fpos = ncac_req->offvec[i];
  396.         fregions[i].size = ncac_req->sizevec[i];
  397.     }
  398.  
  399.     /* Some extents counted by "extcnt" may be same. Also the
  400.      * number of communication buffers should be same as the 
  401.      * extcnt. Use "comcnt" to overprovision resources.
  402.      */
  403.  
  404.     comcnt = extcnt;
  405.  
  406.     if ( ncac_req->reserved_cbufcnt < comcnt ) {
  407.         if ( ncac_req->cbufoff ) free( ncac_req->cbufoff);
  408.  
  409.         allocsize = ( sizeof(PVFS_offset) + sizeof(char*) + sizeof(PVFS_size)
  410.             + sizeof(struct extent *) + 3*sizeof(int) ) * comcnt; 
  411.  
  412.         ncac_req->foff  =(PVFS_offset*) malloc(allocsize); 
  413.  
  414.         if ( ncac_req->foff == NULL ) {
  415.             ncac_req->error = -ENOMEM;
  416.  
  417.             free(fregions);
  418.  
  419.             return -ENOMEM;
  420.         }
  421.  
  422.         ncac_req->cbufoff  =(char**) & ncac_req->foff[comcnt];
  423.         ncac_req->cbufsize =(PVFS_size*)  &ncac_req->cbufoff[comcnt];
  424.         ncac_req->cbufhash =(struct extent**)
  425.                             &ncac_req->cbufsize[comcnt];
  426.         ncac_req->cbufflag =(int*) &ncac_req->cbufhash[comcnt];
  427.         ncac_req->cbufrcnt =(int*) &ncac_req->cbufflag[comcnt];
  428.         ncac_req->cbufwcnt =(int*) &ncac_req->cbufrcnt[comcnt];
  429.  
  430.         ncac_req->reserved_cbufcnt = comcnt;
  431.  
  432.         memset(ncac_req->foff, 0, allocsize);
  433.     }
  434.     
  435.     foff = ncac_req->foff;
  436.     cbufoff = ncac_req->cbufoff;
  437.     cbufsize = ncac_req->cbufsize;
  438.     cbufflag = ncac_req->cbufflag;
  439.  
  440.     /* How many different extents are needed? Put them in an
  441.      * ordered manner to be friendly to the underlying I/O system.
  442.      * What are communication buffers used for the upper layer?
  443.      * (offset to the related extent, size).
  444.      */
  445.     /* quick sort the list of file regions. If the upper layer
  446.      * can present the file regions in an ordered manner, we can
  447.      * eliminate this sorting.
  448.      */
  449.     qsort(fregions, ncac_req->offcnt, sizeof(struct freg_tuple), comp_pos);
  450.  
  451. #if  1
  452.     for (i=0; i<ncac_req->offcnt; i++){
  453.         fprintf(stderr, "fpos:%lld, size:%lld\n", lld(fregions[i].fpos),
  454. lld(fregions[i].size));
  455.     }
  456. #endif
  457.  
  458.     comcnt = 0;
  459.     for ( i =0; i <ncac_req->offcnt; i++){
  460.         cnt = (fregions[i].fpos+fregions[i].size+NCAC_dev.extsize-1)/
  461.             NCAC_dev.extsize - fregions[i].fpos/NCAC_dev.extsize;
  462.  
  463.         firstoff=(unsigned long)(fregions[i].fpos & (NCAC_dev.extsize -1)); 
  464.  
  465.         foff[comcnt] = fregions[i].fpos - firstoff;
  466.         cbufoff[comcnt] = (char*)firstoff;
  467.         cbufsize[comcnt] = NCAC_dev.extsize - firstoff;
  468.         cbufflag[comcnt] = NCAC_COMM_NOT_READY;
  469.  
  470.         for ( j= 1; j < cnt; j++){
  471.             foff[comcnt+j] = foff[comcnt+j-1] + NCAC_dev.extsize;
  472.             cbufoff[comcnt+j] = 0;
  473.             cbufsize[comcnt+j] = NCAC_dev.extsize;
  474.             cbufflag[comcnt+j] = NCAC_COMM_NOT_READY;
  475.         }
  476.         /* adjust the size of the last buffer in each segment. */
  477.         cbufsize[comcnt+cnt-1] -= (fregions[i].fpos+fregions[i].size) % 
  478.                     NCAC_dev.extsize;
  479.  
  480.         comcnt += cnt;
  481.     }
  482.  
  483.     /* so far, in the ncac_req.foff, some extents are probably same,
  484.      * but they are consecutive.
  485.      */
  486.  
  487.     free(fregions);
  488.  
  489.     ncac_req->cbufcnt = comcnt;
  490.  
  491. #if 1
  492.     fprintf(stderr, "[%s] exit %d comm buffers\n", __FUNCTION__, comcnt);
  493.     for (i=0; i<comcnt; i++){
  494.         fprintf(stderr, "fpos:%lld, buf_off:%ld, size:%lld\n", lld(foff[i]),
  495. (unsigned long)cbufoff[i], lld(cbufsize[i]));
  496.     }
  497. #endif
  498.  
  499.     return 0;
  500. }
  501.  
  502. /* NCAC_do_jobs(): this is the workhorse of NCAC.
  503.  * Several things are worth being noted.
  504.  * 1) There are three lists in the NCAC. A request may be in one of
  505.  *    them given a time. It also migrates from one to another.
  506.  *    prepare_list: all submitted requests get first into this list.
  507.  *    bufcomp_list: when a request has all its cache buffers available
  508.  *                  for read, this means all read data are in cache.
  509.  *                  for write, this means all buffers needed to place
  510.  *                  written data are available.
  511.  *    comp_list: when a request does not need cache buffer any more.
  512.  *               There are several cases: for buffered operations, this
  513.  *               means that data have been copied between user buffers
  514.  *               and cache buffers; for non-buffered operations, this
  515.  *               means that the cache consumer has called "cache_req_done"
  516.  *               to notify that communication over buffers has been finished.
  517.  *    
  518.  * 2) trigger:
  519.  *    prepare_list -- (progress engine) --> bufcomp_list -- (cache_req_done)
  520.  *                 --> comp_list
  521.  *    prepare_list -- (progress engine) -->comp_list
  522.  * 
  523.  * 3) NCAC_do_jobs acts as a progress engine and tries to move
  524.  *    requests from prepare_list to bufcompl_list or move them
  525.  *    from prepare_list to comp_list.
  526.  *
  527.  * 4) Order semantics: 
  528.  *    FIFO order is maintained. that is,
  529.  *    for requests from a same client, the request processing order
  530.  *    is the same as the order these requests come into the NCAC.
  531.  * 
  532.  * 5) Locking:
  533.  *    NCAC_do_jobs() could be a thread. We assume that
  534.  *    this thread can be work on the specified list exclusively.
  535.  *    This implies that if multiple "NCAC_do_jobs()" threads exist,
  536.  *    we associate one (or more than one) list(s) to each thread.
  537.  *    Thus, we don't need to lock while in NCAC_do_jobs.
  538.  *
  539.  *    Complication: order semantics??? 
  540.  *    
  541.  * 6) TODO: multiple lists for group ids. That is, we have multiple
  542.  *    prepare_lists and other related lists. One extreme case is that
  543.  *    each <fs_id, handle> has a list.
  544.  *    
  545.  */
  546.  
  547. int NCAC_do_jobs(struct list_head *prep_list, struct list_head *bufcomp_list,
  548.                  struct list_head *comp_list, NCAC_lock *lock)
  549. {
  550.     int ret; 
  551.     NCAC_req_t *ncac_req;
  552.  
  553. dojob:
  554.  
  555.     /* read a request from the prep_list job. When a job is read out 
  556.      * (NOT taken from the list), there is a flag to indicate that 
  557.      * someone else has read this request out. So get_request_from_list 
  558.      * always returns a request which is not read out by others 
  559.      */    
  560.  
  561.     NCAC_read_request_from_list_lock(prep_list, lock, &ncac_req);
  562.  
  563.     if (ncac_req) {
  564.         ret = NCAC_do_a_job(ncac_req, prep_list, bufcomp_list, comp_list, lock); 
  565.  
  566.         ncac_req->read_out = 0;
  567.         if ( ret < 0 ) 
  568.             return ret;
  569.  
  570.         if ( ncac_req->status == NCAC_BUFFER_COMPLETE || 
  571.             ncac_req->status == NCAC_COMPLETE ) 
  572.         goto dojob; 
  573.     }
  574.  
  575.     return 0; 
  576. }
  577.  
  578.  
  579. /* NCAC_do_a_job(): make progress on a particular request.
  580.  * According to the job optype, a particular job horseworker
  581.  * is called. All horseworkers are implemented in "ncac_job.c".
  582.  */
  583.  
  584. int NCAC_do_a_job(NCAC_req_t *ncac_req, struct list_head *prep_list, 
  585.                 struct list_head *bufcomp_list, 
  586.                 struct list_head *comp_list, NCAC_lock *lock)
  587. {
  588.     int ret;
  589.  
  590.     fprintf(stderr, "NCAC_do_a_job enter\n");
  591.  
  592.     switch (ncac_req->optype){
  593.  
  594.         /* cached read */
  595.         case NCAC_READ: 
  596.  
  597.             ret = NCAC_do_a_read_job(ncac_req);
  598.             break;
  599.  
  600.         /* cached write */
  601.         case NCAC_WRITE: 
  602.  
  603.             ret = NCAC_do_a_write_job(ncac_req);
  604.             break;
  605.  
  606.         /* cached buffer read */
  607.         case NCAC_BUF_READ:
  608.  
  609.             ret = NCAC_do_a_bufread_job(ncac_req);
  610.             break;
  611.  
  612.         /* cached buffer write */
  613.         case NCAC_BUF_WRITE:
  614.  
  615.             ret = NCAC_do_a_bufwrite_job(ncac_req);
  616.             break;
  617.  
  618.         case NCAC_QUERY:
  619.             ret = NCAC_do_a_query_job(ncac_req);
  620.             break;
  621.  
  622.         case NCAC_DEMOTE:
  623.             ret = NCAC_do_a_demote_job(ncac_req);
  624.             break;
  625.  
  626.         case NCAC_SYNC:         
  627.             ret = NCAC_do_a_sync_job(ncac_req);
  628.             break;
  629.  
  630.         default:
  631.             ret = NCAC_JOB_OPTYPE_ERR;
  632.             fprintf(stderr, "NCAC_do_a_job: unrecognize optype flag\n");
  633.             break;
  634.     }
  635.  
  636.     if ( ncac_req->status == NCAC_BUFFER_COMPLETE ) {
  637.  
  638.         NCAC_list_del_lock(&ncac_req->list, lock);
  639.  
  640.         NCAC_list_add_tail_lock(&ncac_req->list, bufcomp_list, lock); 
  641.  
  642.     }else if ( ncac_req->status == NCAC_COMPLETE ) 
  643.     {
  644.         NCAC_list_del_lock(&ncac_req->list, lock);
  645.         NCAC_list_add_tail_lock(&ncac_req->list, comp_list, lock); 
  646.     }
  647.  
  648.     fprintf(stderr, "NCAC_do_a_job exit\n");
  649.     return ret;
  650. }
  651.  
  652.  
  653. int NCAC_check_request( int id, struct NCAC_req **ncac_req )
  654. {
  655.     struct NCAC_req *req;
  656.     int ret;
  657.  
  658.     req = &NCAC_dev.free_req_src[id]; 
  659.     if ( req->status == NCAC_COMPLETE || req->status == NCAC_BUFFER_COMPLETE ) {
  660.         *ncac_req = req;
  661.         return 0;
  662.     }
  663.  
  664.     if ( req->status == NCAC_REQ_UNUSED ) {
  665.         *ncac_req = NULL;
  666.         NCAC_error("NCAC_check_request:no such request");
  667.         return -1;
  668.     }
  669.  
  670.     ret = NCAC_do_a_job(req, &(NCAC_dev.prepare_list), &(NCAC_dev.bufcomp_list), &(NCAC_dev.comp_list), &NCAC_dev.req_list_lock);
  671.  
  672.     if ( ret < 0 ) {
  673.         NCAC_error("NCAC_check_request:do a job error (%d)", req->error);
  674.     }
  675.     *ncac_req = req;
  676.     return ret;
  677. }
  678.  
  679. /* done request(): mark a request is done. Several cases:
  680.  *     NCAC_BUFFER_COMPLETE: pending communication on buffers is done.
  681.  *                          state transition.
  682.  *     NCAC_COMPLETE:     nothing with cache buffers.
  683.  *
  684.  * both return ncac_req to the req free list.
  685.  *
  686.  * Tradeoff:  we pre-allocate a list of req structures during initilization.
  687.  * These requests are shared by all flows. So we need to have lock to
  688.  * get and return them to the free list. This is guarded by 
  689.  * NCAC_dev.req_list_lock. We have two benefits at this lock cost.
  690.  * (1) no need to allocate and free request
  691.  * (2) reuse buffer information structure if possible. A lazy free technique
  692.  *     is used to reuse the arrays for buffer information.
  693.  */
  694.  
  695. int NCAC_done_request( int id )
  696. {
  697.     struct NCAC_req *ncac_req;
  698.     int ret = 0;
  699.  
  700.     ncac_req = &NCAC_dev.free_req_src[id]; 
  701.  
  702.     switch ( ncac_req->status ) {
  703.  
  704.         case NCAC_BUFFER_COMPLETE:  /* pending communication is done */
  705.  
  706.             NCAC_list_del_lock(&ncac_req->list, &NCAC_dev.req_list_lock);
  707.             ret = NCAC_extent_done_access( ncac_req );
  708.  
  709.             break;
  710.  
  711.         case NCAC_COMPLETE:
  712.  
  713.             NCAC_list_del_lock(&ncac_req->list, &NCAC_dev.req_list_lock);
  714.             break;
  715.  
  716.         default: /* error. leaking here. */
  717.             fprintf(stderr, "NCAC_done_request: wrong status of internal request\n");
  718.             ret = NCAC_REQ_STATUS_ERR;
  719.             return ret;
  720.     }
  721.  
  722.     /* prepare to return this request to the free list.
  723.      * We cannot just zero the ncac_req for all cases. 
  724.      * We want to reuse buffer inforation arrays to avoid
  725.      * allcations. */
  726.  
  727.     if ( ncac_req->reserved_cbufcnt == 0 ) {
  728.         id = ncac_req->id;
  729.         memset( ncac_req, 0, sizeof(struct NCAC_req) );
  730.         ncac_req->id = id;
  731.  
  732.     }else{ /* we want reuse buffer information arrays */
  733.         ncac_req->cbufcnt = 0;
  734.         ncac_req->mapping = 0;
  735.         ncac_req->ioreq   = INVAL_IOREQ;
  736.         ncac_req->read_out = 0;
  737.     }
  738.  
  739.     ncac_req->status  = NCAC_REQ_UNUSED; 
  740.  
  741.     NCAC_list_add_tail_lock( &ncac_req->list, &NCAC_dev.free_req_list, &NCAC_dev.req_list_lock); 
  742.  
  743.  
  744.     return ret;
  745. }
  746.  
  747. static inline struct inode *search_inode_list (PVFS_handle handle)
  748. {
  749.     int inode_index;
  750.     struct inode * cur;
  751.  
  752.     inode_index = handle % MAX_INODE_NUM;
  753.  
  754.     cur = inode_arr[inode_index]; 
  755.     while ( NULL != cur ) {
  756.         if ( cur->handle == handle ) return cur;    
  757.         cur = cur->next;
  758.     }    
  759.  
  760.     return NULL;
  761. }
  762.  
  763. /* get_inode: give a fs_id and a file handler, an inode-like structure
  764.  *            is allocated. Since handle is an arbitrary number, we should
  765.  *            have a mapping between this handler and the index of inode.
  766.  * get_inode should be called under some lock because two callers may
  767.  *     work on the same collision list.  
  768.  */
  769. static inline struct inode *get_inode(PVFS_fs_id coll_id, 
  770.                 PVFS_handle handle, PVFS_context_id context_id)
  771. {
  772.     struct inode *inode;
  773.     int inode_index;
  774.  
  775.     inode_index = handle % MAX_INODE_NUM;
  776.  
  777.     /* search the inode list with the index of "inode_index" */
  778.     inode = search_inode_list (handle);
  779.  
  780.     fprintf(stderr, "handle: %lld, index: %d, inode:%p\n", lld(handle), inode_index, inode);
  781.  
  782.     if ( NULL == inode ){
  783.         inode=(struct inode*)malloc(sizeof(struct inode));
  784.  
  785.         /* initialize it */
  786.         memset(inode, 0, sizeof(struct inode));
  787.  
  788.         inode->cache_stack = get_cache_stack();
  789.         inode->nrpages = 0;
  790.         inode->nr_dirty = 0;
  791.         inode->coll_id = coll_id;
  792.         inode->handle = handle;
  793.         inode->context_id = context_id;
  794.  
  795.         init_single_radix_tree(&inode->page_tree, NCAC_dev.get_value, NCAC_dev.max_b);
  796.  
  797.         spin_lock_init(&inode->lock);
  798.  
  799.         INIT_LIST_HEAD(&(inode->clean_pages));
  800.         INIT_LIST_HEAD(&(inode->dirty_pages));
  801.  
  802.         /* put the new inode to the head of the collision list */
  803.         inode->next = inode_arr[inode_index];
  804.         inode_arr[inode_index] = inode;
  805.     }
  806.  
  807.     return inode;
  808. }
  809.  
  810.  
  811.  
  812. static inline void extent_dump(struct extent *extent)
  813. {
  814.     fprintf(stderr, "flags:%x\t status:%d\t    index:%d\t\n", (int)extent->flags, extent->status, (int)extent->index);
  815.     fprintf(stderr, "writes:%d\t reads:%d\t    ioreq:%lld\t\n", extent->writes, extent->reads, lld(extent->ioreq));
  816.  
  817. }
  818.  
  819. static inline void list_dump(struct list_head *head)
  820. {
  821.     struct extent *page;
  822.     struct list_head *tmp;
  823.  
  824.     if (!list_empty(head)) {
  825.         tmp = head->next;
  826.         while (tmp!=head) {
  827.             page = list_entry(tmp, struct extent, lru);
  828.             fprintf(stderr, "extent: %p\t", page); 
  829.             extent_dump(page);
  830.  
  831.             tmp = tmp->next; 
  832.         } 
  833.     }
  834. }
  835.  
  836. void cache_dump_active_list(void)
  837. {
  838.    struct cache_stack *cache = get_cache_stack();
  839.  
  840.    fprintf(stderr, "active_list:\n");
  841.    list_dump(&cache->active_list);
  842. }
  843.  
  844. void cache_dump_inactive_list(void)
  845. {
  846.    struct cache_stack *cache = get_cache_stack();
  847.  
  848.    fprintf(stderr, "inactive_list:\n");
  849.    list_dump(&cache->inactive_list);
  850. }
  851.  
  852. static inline void req_list_dump(struct list_head *head)
  853. {
  854.     NCAC_req_t *req;
  855.     struct list_head *tmp;
  856.  
  857.     if (!list_empty(head)) {
  858.         tmp = head->next;
  859.         while (tmp!=head) {
  860.             req = list_entry(tmp, NCAC_req_t, list);
  861.             fprintf(stderr, "req: %p\t", req); 
  862.             //req_dump(req);
  863.  
  864.             tmp = tmp->next; 
  865.         } 
  866.     }
  867.     fprintf(stderr, "\n"); 
  868. }
  869.  
  870. static void cmp_list_dump(void) __attribute__((unused));
  871. static void cmp_list_dump(void)
  872. {
  873.    fprintf(stderr, "cmp list:    ");
  874.    req_list_dump(&NCAC_dev.comp_list);
  875. }
  876.  
  877. static void job_list_dump(void) __attribute__((unused));
  878. static void job_list_dump(void)
  879. {
  880.    fprintf(stderr, "job list:    ");
  881.    req_list_dump(&NCAC_dev.prepare_list);
  882. }
  883.  
  884. static inline void list_dump_list(struct list_head *head)
  885. {
  886.     struct extent *page;
  887.     struct list_head *tmp;
  888.  
  889.     if (!list_empty(head)) {
  890.         tmp = head->next;
  891.         while (tmp!=head) {
  892.             page = list_entry(tmp, struct extent, list);
  893.             fprintf(stderr, "extent: %p\t", page); 
  894.             extent_dump(page);
  895.  
  896.             tmp = tmp->next; 
  897.         } 
  898.     }    
  899. }
  900.  
  901. static void dirty_list_dump(int handle) __attribute__((unused));
  902. static void dirty_list_dump(int handle)
  903. {
  904.     struct inode *inode;
  905.  
  906.     inode = inode_arr[handle];
  907.  
  908.     list_dump_list(&inode->dirty_pages);
  909. }
  910.