home *** CD-ROM | disk | FTP | other *** search
/ ftp.parl.clemson.edu / 2015-02-07.ftp.parl.clemson.edu.tar / ftp.parl.clemson.edu / pub / pvfs2 / orangefs-2.8.3-20110323.tar.gz / orangefs-2.8.3-20110323.tar / orangefs / src / io / buffer / ncac-trove.c < prev    next >
C/C++ Source or Header  |  2008-11-19  |  11KB  |  403 lines

  1.  
  2. /* this file contains functions used by the cache to access Trove */
  3.  
  4. #include "internal.h"
  5. #include "trove.h"
  6. #include "aiovec.h"
  7. #include "ncac-trove.h"
  8. #include "state.h"
  9. #include "pvfs2-internal.h"
  10.  
  11. static inline void  offset_shorten( int s_cnt, 
  12.                       PVFS_offset *stream_offset_array,
  13.                       PVFS_size *stream_size_array,
  14.                       int m_cnt, 
  15.                       char **mem_offset_array,
  16.                       PVFS_size *mem_size_array,
  17.                       int *new_s_cnt, int *new_m_cnt);
  18.  
  19.  
  20. int NCAC_aio_read_ext( PVFS_fs_id coll_id, PVFS_handle handle, 
  21.                PVFS_context_id context, struct aiovec *aiovec, 
  22.                int *ioreq)
  23. {
  24.     char **mem_offset_array;
  25.     PVFS_size *mem_size_array;
  26.     PVFS_size off;
  27.     int m_cnt;
  28.     PVFS_offset *stream_offset_array;
  29.     PVFS_size *stream_size_array;
  30.     void *user_ptr_array[1] = { (char *) 13 };
  31.  
  32.     TROVE_op_id op_id;
  33.     TROVE_size output_size;
  34.  
  35.     int s_cnt;
  36.     int ret;
  37.  
  38.     int i;
  39.  
  40.     m_cnt = s_cnt = aiovec_count(aiovec);
  41.  
  42.     if ( !m_cnt ) {
  43.         *ioreq = INVAL_IOREQ;
  44.         return -1;
  45.     }
  46.  
  47.     mem_offset_array = aiovec->mem_offset_array;
  48.     mem_size_array   = aiovec->mem_size_array;
  49.  
  50.     stream_offset_array = aiovec->stream_offset_array;
  51.     stream_size_array   = aiovec->stream_size_array;
  52.  
  53.     /* for debug only */
  54.  
  55.     DPRINT("NCAC_aio_read_ext: info\n");
  56.  
  57.     for (i=0; i< m_cnt; i++ ) {
  58.         DPRINT( "off:%lld, size=%lld, buff:%p, size=%lld\n", 
  59.                 stream_offset_array[i], stream_size_array[i],
  60.                  mem_offset_array[i], mem_size_array[i] );
  61.     }
  62.  
  63.  
  64.     /* adjust the offset and length to the I/O unit.
  65.      * In the current implementation, we read/write over the whole extent. 
  66.      * At the same time, shorten the iovec list if possible. 
  67.      */
  68.  
  69.  
  70.     for (i=0; i< m_cnt; i++ ) {
  71.         if ( stream_size_array[i] % NCAC_dev.extsize ) {
  72.             stream_size_array[i] = NCAC_dev.extsize;
  73.             mem_size_array[i]    = NCAC_dev.extsize;
  74.  
  75.             off = stream_offset_array[i] & (NCAC_dev.extsize -1) ;
  76.  
  77.             stream_offset_array[i] -= off;
  78.             mem_offset_array[i] -= off;
  79.         }
  80.  
  81.         DPRINT( "off:%lld, size=%lld, buff:%p, size=%lld\n", 
  82.                 stream_offset_array[i], stream_size_array[i],
  83.                  mem_offset_array[i], mem_size_array[i] );
  84.  
  85.     }
  86.  
  87.     offset_shorten( s_cnt, 
  88.                     stream_offset_array,
  89.                     stream_size_array,
  90.                     m_cnt, 
  91.                     mem_offset_array,
  92.                     mem_size_array,
  93.                     &s_cnt, &m_cnt);
  94.  
  95.     DPRINT("--------------after offset_shorten: s_cnt=%d\n", s_cnt);
  96.     for (i=0; i< s_cnt; i++ ) {
  97.         DPRINT( "off:%lld, size=%lld\n", 
  98.                 stream_offset_array[i], stream_size_array[i] );
  99.     }
  100.  
  101.     DPRINT("--------------after offset_shorten: m_cnt=%d\n", m_cnt);
  102.     for (i=0; i< m_cnt; i++ ) {
  103.         DPRINT( "buff:%p, size=%lld\n", 
  104.                 mem_offset_array[i], mem_size_array[i] );
  105.     }
  106.  
  107.     ret = trove_bstream_write_list(coll_id,
  108.                                   handle,
  109.                                   mem_offset_array,
  110.                                   mem_size_array,
  111.                                   m_cnt,
  112.                                   stream_offset_array,
  113.                                   stream_size_array,
  114.                                   s_cnt,
  115.                                   &output_size,
  116.                                   0, /* flags */
  117.                                   NULL, /* vtag */
  118.                                   user_ptr_array,
  119.                                   context,
  120.                                   &op_id, NULL);
  121.  
  122.     if (ret < 0) {
  123.         NCAC_error("trove listio read failed\n");
  124.         return -1;
  125.     }
  126.  
  127.     *ioreq = op_id;
  128.  
  129.     DPRINT("NCAC_aio_read_ext: io request=%lld(%d)\n", op_id, *ioreq);
  130.  
  131.     return 0;
  132. }
  133.  
  134. int NCAC_aio_write( PVFS_fs_id coll_id, 
  135.                     PVFS_handle handle,
  136.                     PVFS_context_id context,
  137.                      int cnt,
  138.                     PVFS_offset *stream_offset_array, 
  139.                     PVFS_size *stream_size_array,
  140.                     char **mem_offset_array,  
  141.                     PVFS_size *mem_size_array, 
  142.                     int *ioreq) 
  143. {
  144.     int m_cnt, s_cnt;
  145.     PVFS_size off;
  146.     void *user_ptr_array[1] = { (char *) 13 };
  147.     TROVE_op_id op_id;
  148.     TROVE_size output_size;
  149.     int ret;
  150.     int i;
  151.  
  152.     DPRINT("NCAC_aio_write: info\n");
  153.  
  154.     m_cnt = s_cnt = cnt;
  155.  
  156.  
  157.     /* adjust the offset and length to the I/O unit.
  158.      * In the current implementation, we read/write over the whole extent. 
  159.      * At the same time, shorten the iovec list if possible. 
  160.      */
  161.  
  162.     for (i=0; i< m_cnt; i++ ) {
  163.         if ( stream_size_array[i] % NCAC_dev.extsize ) {
  164.             stream_size_array[i] = NCAC_dev.extsize;
  165.             mem_size_array[i]    = NCAC_dev.extsize;
  166.  
  167.             off = stream_offset_array[i] & (NCAC_dev.extsize -1) ;
  168.  
  169.             stream_offset_array[i] -= off;
  170.             mem_offset_array[i] -= off;
  171.         }
  172.  
  173.         DPRINT( "off:%lld, size=%lld, buff:%p, size=%lld\n", 
  174.                 stream_offset_array[i], stream_size_array[i],
  175.                  mem_offset_array[i], mem_size_array[i] );
  176.  
  177.     }
  178.  
  179.     offset_shorten( s_cnt, 
  180.                     stream_offset_array,
  181.                     stream_size_array,
  182.                     m_cnt, 
  183.                     mem_offset_array,
  184.                     mem_size_array,
  185.                     &s_cnt, &m_cnt);
  186.  
  187.  
  188. #ifdef DEBUG
  189.     fprintf(stderr, "--------------after offset_shorten: s_cnt=%d\n", s_cnt);
  190.     for (i=0; i< s_cnt; i++ ) {
  191.         fprintf(stderr, "off:%lld, size=%lld\n", 
  192.                 stream_offset_array[i], stream_size_array[i] );
  193.     }
  194.  
  195.     fprintf(stderr, "--------------after offset_shorten: m_cnt=%d\n", m_cnt);
  196.     for (i=0; i< m_cnt; i++ ) {
  197.         fprintf(stderr, "buff:%p, size=%lld\n", 
  198.                 mem_offset_array[i], mem_size_array[i] );
  199.     }
  200. #endif
  201.  
  202.     ret = trove_bstream_write_list(coll_id,
  203.                                   handle,
  204.                                   mem_offset_array,
  205.                                   mem_size_array,
  206.                                   m_cnt,
  207.                                   stream_offset_array,
  208.                                   stream_size_array,
  209.                                   s_cnt,
  210.                                   &output_size,
  211.                                   0, /* flags */
  212.                                   NULL, /* vtag */
  213.                                   user_ptr_array,
  214.                                   context,
  215.                                   &op_id, NULL);
  216.  
  217.     if (ret < 0) {
  218.         NCAC_error("trove listio read failed\n");
  219.         return -1;
  220.     }
  221.  
  222.     *ioreq = op_id;
  223.  
  224.     DPRINT("NCAC_aio_write: io request=%lld(%d)\n", op_id, *ioreq);
  225.  
  226.     return 0;
  227. }
  228.  
  229. /* do read for read modity write.
  230.  * In the current implementation, we read the whole extent. But the input
  231.  * parameters can be used to do finer read.
  232.  */
  233. int do_read_for_rmw(PVFS_fs_id coll_id, PVFS_handle handle, 
  234.                     PVFS_context_id context, struct extent *extent, 
  235.                     PVFS_offset pos, char * off, int size, int *ioreq)
  236. {
  237.     char * buf;
  238.     TROVE_size inout_size;
  239.     TROVE_op_id op_id;
  240.     
  241.     int ret;
  242.  
  243.     buf = extent->addr;
  244.     inout_size = NCAC_dev.extsize;
  245.  
  246.     DPRINT("do_read_for_rmw; pos=%lld, buf=%p, size=%lld\n", pos, buf, inout_size);
  247.     ret = trove_bstream_read_at(coll_id, handle,
  248.                                 buf, &inout_size,
  249.                                 0, 0, NULL, NULL,
  250.                                 context, &op_id, NULL);
  251.  
  252.     DPRINT("do_read_for_rmw; req=%lld\n", op_id);
  253.  
  254.     *ioreq = op_id;
  255.     return ret;
  256. }
  257.  
  258.  
  259.  
  260. /* NCAC_check_ioreq(): check pending Trove request on an extent.
  261.  * Since a list of extents may be associated with one Trove request,
  262.  * and Trove completion is one time, when a completion occurs, the
  263.  * status of all extents should be changed. This is done by a link
  264.  * of extent->ioreq_next. Using ioreq_list allows us to have
  265.  * more than one Trove requests for a job.
  266.  */
  267.  
  268. int NCAC_check_ioreq(struct extent *extent)
  269. {
  270.     TROVE_op_id  op_id;
  271.     TROVE_coll_id coll_id;
  272.     TROVE_context_id context_id;
  273.     TROVE_ds_state state;
  274.     int count;
  275.     int ret;
  276.  
  277.     op_id = extent->ioreq;
  278.  
  279.     if ( op_id == INVAL_IOREQ ) {
  280.         NCAC_error("invalid trove io req id");    
  281.         return -1;
  282.     }
  283.  
  284.     coll_id = extent->mapping->coll_id;
  285.     context_id = extent->mapping->context_id;
  286.  
  287.     DPRINT("NCAC_check_ioreq: req=%lld, coll_id=%d, context=%d, index=%ld\n", op_id, coll_id, context_id, extent->index);
  288.  
  289.     ret = trove_dspace_test(coll_id, op_id, context_id, &count, NULL, NULL, &state, TROVE_DEFAULT_TEST_TIMEOUT);
  290.  
  291.     if ( ret > 0 ) {
  292.         fprintf(stderr, "++++++++++++NCAC_check_ioreq: finished %lld\n", lld(op_id));
  293.         extent->ioreq = INVAL_IOREQ;
  294.     }
  295.  
  296.     return ret;
  297. }
  298.  
  299.  
  300. /*
  301.  * Reduce <file offset, len> and <mem offset, len>  pairs in-place.  
  302.  */
  303.  
  304. static inline void  offset_shorten( int s_cnt, 
  305.                       PVFS_offset *stream_offset_array,
  306.                       PVFS_size *stream_size_array,
  307.                       int m_cnt, 
  308.                       char **mem_offset_array,
  309.                       PVFS_size *mem_size_array,
  310.                       int *new_s_cnt, int *new_m_cnt)
  311. {
  312.  
  313.     int i = 0;
  314.     int seg = 0;
  315.  
  316.     /* shorten stream offset */
  317.     while ( i < s_cnt - 1 ) {
  318.         if ( stream_offset_array[seg] + stream_size_array[seg] ==
  319.             stream_offset_array[i+1] ) {
  320.  
  321.                 stream_size_array[seg] += stream_size_array[i+1];
  322.  
  323.                 stream_size_array[i+1] = 0;
  324.         }else {
  325.             seg = i+1;
  326.         }
  327.         i ++ ;
  328.     }
  329.  
  330.     i = 1; seg = 1;
  331.     while ( i < s_cnt ) {
  332.         if ( stream_size_array[i] != 0 ) {
  333.             if ( i != seg ) {
  334.                 stream_size_array[seg] = stream_size_array[i];
  335.                 stream_offset_array[seg] = stream_offset_array[i];
  336.             }
  337.             seg ++;
  338.         }
  339.         i++;
  340.     }
  341.  
  342.     *new_s_cnt = seg;
  343.  
  344.  
  345.     i = 0;
  346.     seg = 0;
  347.  
  348.     /* shorten stream offset */
  349.     while ( i < m_cnt - 1 ) {
  350.         if ( mem_offset_array[seg] + mem_size_array[seg] ==
  351.              mem_offset_array[i+1] ) {
  352.  
  353.                 mem_size_array[seg] += mem_size_array[i+1];
  354.  
  355.                 mem_size_array[i+1] = 0;
  356.         }else {
  357.             seg = i+1;
  358.         }
  359.         i ++ ;
  360.     }
  361.  
  362.     i = 1; seg = 1;
  363.     while ( i < s_cnt ) {
  364.         if ( mem_size_array[i] != 0 ) {
  365.             if ( i != seg ) {
  366.                 mem_size_array[seg] = mem_size_array[i];
  367.                 mem_offset_array[seg] = mem_offset_array[i];
  368.             }
  369.             seg ++;
  370.         }
  371.         i++;
  372.     }
  373.  
  374.     *new_m_cnt = seg;
  375.  
  376.     return;
  377. }
  378.  
  379. int init_io_read( PVFS_fs_id coll_id, PVFS_handle handle, 
  380.         PVFS_context_id context, PVFS_offset foffset, 
  381.         PVFS_size size, void *buf, TROVE_op_id *ioreq)
  382. {
  383.     void *user_ptr_array[1] = { (char *) 13 };
  384.     int ret;
  385.  
  386.     ret = trove_bstream_read_at(coll_id,
  387.                                handle,
  388.                                buf,
  389.                                 &size,
  390.                                 foffset,
  391.                                   0, /* flags */
  392.                                   NULL, /* vtag */
  393.                                   user_ptr_array,
  394.                                   context,
  395.                                   ioreq, NULL);
  396.  
  397.     if (ret < 0) {
  398.         NCAC_error("trove read at failed\n");
  399.         return -1;
  400.     }
  401.     return 0;
  402. }
  403.