home *** CD-ROM | disk | FTP | other *** search
/ ftp.parl.clemson.edu / 2015-02-07.ftp.parl.clemson.edu.tar / ftp.parl.clemson.edu / pub / pvfs2 / orangefs-2.8.3-20110323.tar.gz / orangefs-2.8.3-20110323.tar / orangefs / src / io / description / pint-request.c < prev    next >
C/C++ Source or Header  |  2009-04-30  |  33KB  |  976 lines

  1. /*
  2.  * (C) 2002 Clemson University and The University of Chicago.
  3.  *
  4.  * See COPYING in top-level directory.
  5.  */       
  6.  
  7. #include <assert.h>
  8. #include <stdlib.h>
  9. #include <stdio.h>
  10. #include <string.h>
  11. #include <gossip.h>
  12. #include <pvfs2-debug.h>
  13. #include <pint-request.h>
  14. #include <pint-distribution.h>
  15. #include "pvfs2-internal.h"
  16.  
  17. static PVFS_offset PINT_request_disp(PINT_Request *request);
  18.  
  19. /* this macro is only used in this file to add a segment to the
  20.  * result list.
  21.  */
  22.  
  23. #define PINT_ADD_SEGMENT(result,offset,size,mode) \
  24. do { \
  25.     if (size > 0) \
  26.     { \
  27.         /* add a segment here */ \
  28.         gossip_debug(GOSSIP_REQUEST_DEBUG,"\tprocess a segment\n"); \
  29.         gossip_debug(GOSSIP_REQUEST_DEBUG,"\t\t\tof %lld sz %lld\n", lld(offset), lld(size)); \
  30.         if (PINT_IS_CKSIZE(mode)) \
  31.         { \
  32.             gossip_debug(GOSSIP_REQUEST_DEBUG,"\tcount segment in checksize\n"); \
  33.             result->segs++; \
  34.         } \
  35.         else if (result->segs > 0 && \
  36.                 result->offset_array[result->segs-1] + \
  37.                 result->size_array[result->segs-1] == offset) \
  38.         { \
  39.             /* combine adjacent segments */ \
  40.             gossip_debug(GOSSIP_REQUEST_DEBUG,"\tcombine a segment %d\n", result->segs-1); \
  41.             result->size_array[result->segs-1] += size; \
  42.         } \
  43.         else \
  44.         { \
  45.             /* add a segment */ \
  46.             gossip_debug(GOSSIP_REQUEST_DEBUG,"\tadd a segment %d\n", result->segs); \
  47.             result->offset_array[result->segs] = offset; \
  48.             result->size_array[result->segs] = size; \
  49.             result->segs++; \
  50.         } \
  51.         result->bytes += size; \
  52.     } \
  53. } while (0)
  54.  
  55. /* end of the PINT_ADD_SEGMENT macro */
  56.  
  57.  
  58. /* This function calls PVFS_Distribute for each contiguous chunk */
  59. /* of the request.  PVFS_Distribute returns the number of bytes */
  60. /* processed.  If this is less than the total bytes in the chunk */
  61. /* this function returns otherwise it keeps processing until all */
  62. /* chunks are done.  Returns 0 on success and -PVFS_error on failure.  The */
  63. /* number of bytes processed is stored in result->bytes.  It */
  64. /* is assumed caller we retry if this is less than the total bytes */
  65. /* in the request */
  66. int PINT_process_request(PINT_Request_state *req,
  67.     PINT_Request_state *mem,
  68.     PINT_request_file_data *rfdata,
  69.     PINT_Request_result *result,
  70.     int mode)
  71. {
  72.     void *temp_space = NULL;    /* temp copy of req state for size call */
  73.     PVFS_boolean lvl_flag;      /* indicates level should be decremented */
  74.     PVFS_offset  contig_offset = 0; /* temp for offset of a contig region */
  75.     PVFS_size    contig_size;   /* temp for size of a contig region */
  76.     PVFS_size    retval;        /* return value from calls to distribute */
  77.  
  78.     if (!PINT_IS_MEMREQ(mode))
  79.         gossip_debug(GOSSIP_REQUEST_DEBUG,
  80.             "=========================================================\n");
  81.     gossip_debug(GOSSIP_REQUEST_DEBUG,"PINT_process_request\n");
  82.     /* do very basic error checking here */
  83.     if (!req)
  84.     {
  85.         gossip_lerr("PINT_process_request: Bad PINT_Request_state!\n");
  86.         return -PVFS_EINVAL;
  87.     }
  88.     if (!result || !result->segmax || !result->bytemax)
  89.     {
  90.         gossip_lerr("PINT_process_request: NULL segmax or bytemax!\n");
  91.         return -PVFS_EINVAL;
  92.     }
  93.     if (result->segs >= result->segmax || result->bytes >= result->bytemax)
  94.     {
  95.         gossip_lerr("PINT_process_request: no segments or bytes requested!\n");
  96.         return -PVFS_EINVAL;
  97.     }
  98.     if (!PINT_IS_CKSIZE(mode) && (!result->offset_array || !result->size_array))
  99.     {
  100.         gossip_lerr("PINT_process_request: NULL offset or size array!\n");
  101.         return -PVFS_EINVAL;
  102.     }
  103.     /* initialize some variables */
  104.     retval = 0;
  105.     if (PINT_EQ_CKSIZE(mode)) /* be must be exact here */
  106.     {
  107.         /* request for a size check - do not alter request state */
  108.         gossip_debug(GOSSIP_REQUEST_DEBUG,
  109.                 "\tsize request - copying state, hold on to your hat! dp %d\n",
  110.                 req->cur->rqbase->depth);
  111.         temp_space = (void *)malloc(sizeof(PINT_Request_state)+
  112.                 (sizeof(PINT_reqstack)*req->cur->rqbase->depth));
  113.                 if(!temp_space)
  114.                 {
  115.                    return -PVFS_ENOMEM;
  116.                 }
  117.  
  118.         memcpy(temp_space,req,sizeof(PINT_Request_state));
  119.         req = (PINT_Request_state *)temp_space;
  120.         memcpy(((char *)temp_space) + sizeof(PINT_Request_state),
  121.                 req->cur,(sizeof(PINT_reqstack)*req->cur->rqbase->depth));
  122.         req->cur = (PINT_reqstack *)
  123.             (((char *)temp_space) + sizeof(PINT_Request_state));
  124.     }
  125.     /* check to see if we are picking up where we left off */
  126.     if (req->lvl < 0)
  127.     {
  128.         gossip_debug(GOSSIP_REQUEST_DEBUG,
  129.                 "\tRequest state level < 0 - resetting request state\n");
  130.         /* reinitialize the request state to zero */
  131.         PINT_REQUEST_STATE_RST(req);
  132.     }
  133.     /* automatically set final_offset of req based on mem size */
  134.     if (PINT_IS_CLIENT(mode) && mem && req->final_offset == 0)
  135.     {
  136.         req->final_offset = req->target_offset +
  137.                 mem->cur[0].rqbase->aggregate_size;
  138.         gossip_debug(GOSSIP_REQUEST_DEBUG,"\tsetting final offset %lld\n",
  139.                 lld(req->final_offset));
  140.     }
  141.     /* automatically tile the req */
  142.     if (!PINT_IS_MEMREQ(mode))
  143.     {
  144.         int64_t count;
  145.         if (req->cur[0].rqbase)
  146.         {
  147.             count = req->final_offset / req->cur[0].rqbase->aggregate_size;
  148.         }
  149.         else
  150.         {
  151.             count = req->final_offset;
  152.         }
  153.         req->cur[0].maxel = count + 1;
  154.         gossip_debug(GOSSIP_REQUEST_DEBUG,"\ttiling %lld copies\n", lld(count+1));
  155.     }
  156.     /* deal with skipping over some bytes (type offset) */
  157.     if (req->target_offset > req->type_offset)
  158.     {
  159.         gossip_debug(GOSSIP_REQUEST_DEBUG,"\tskipping ahead to target_offset\n");
  160.         /* find starting offset in request structure */
  161.         PINT_SET_LOGICAL_SKIP(mode);
  162.     }
  163.     else
  164.     {
  165.         /* do we allow external setting of LOGICAL_SKIP */
  166.         /* what about backwards skipping, as in seeking? */
  167.         }
  168.     
  169.     /* we should be ready to begin */
  170.     /* zero retval indicates everything flowing successfully */
  171.     /* positive retval indicates a partial chunk was processed - so we */
  172.     /* wait until later to retry */
  173.     while(!retval)
  174.     {
  175.         if (req->cur[req->lvl].rq)
  176.         {
  177.         /* print the current state of the decoding process */
  178.         gossip_debug(GOSSIP_REQUEST_DEBUG,"\tDo seq of %lld ne %d st %lld nb %d "
  179.         "ub %lld lb %lld as %lld co %llu\n",
  180.                 lld(req->cur[req->lvl].rq->offset), req->cur[req->lvl].rq->num_ereqs,
  181.                 lld(req->cur[req->lvl].rq->stride), req->cur[req->lvl].rq->num_blocks,
  182.                 lld(req->cur[req->lvl].rq->ub), lld(req->cur[req->lvl].rq->lb),
  183.                 lld(req->cur[req->lvl].rq->aggregate_size),
  184.                 lld(req->cur[req->lvl].chunk_offset));
  185.         gossip_debug(GOSSIP_REQUEST_DEBUG,"\t\tlvl %d el %lld blk %d by %lld\n",
  186.                 req->lvl, lld(req->cur[req->lvl].el), req->cur[req->lvl].blk,
  187.                 lld(req->bytes));
  188.         gossip_debug(GOSSIP_REQUEST_DEBUG,"\t\tto %lld ta %lld fi %lld\n",
  189.                 lld(req->type_offset), lld(req->target_offset),
  190.                 lld(req->final_offset));
  191.                if (mem) /* if a mem type is specified print its state */
  192.                {
  193.                 gossip_debug(GOSSIP_REQUEST_DEBUG,"\t\tmto %lld mta %lld mfi %lld\n",
  194.                        lld(mem->type_offset), lld(mem->target_offset),
  195.                        lld(mem->final_offset));
  196.                }
  197.          }
  198.         /* NULL type indicates packed data - handle directly */
  199.         if (req->cur[req->lvl].rq == NULL)
  200.         {
  201.             gossip_debug(GOSSIP_REQUEST_DEBUG,"\tnull type\n");
  202.             contig_offset = req->cur[req->lvl].chunk_offset + req->bytes;
  203.             contig_size = req->cur[req->lvl].maxel - req->bytes;
  204.             lvl_flag = 1;
  205.         }
  206.         /* basic data type or contiguous data - handle directly */
  207.         /* NULL ereq indicates current type is packed bytes */
  208.         /* current type is contiguous because its size equals extent */
  209.         /* AND the num_contig_chunks is 1 */
  210.         else if ((req->cur[req->lvl].rq->ereq == NULL ||
  211.                 (req->cur[req->lvl].rq->aggregate_size ==
  212.                 (req->cur[req->lvl].rqbase->ub -
  213.                     req->cur[req->lvl].rqbase->lb) &&
  214.                 req->cur[req->lvl].rq->ereq->num_contig_chunks == 1)) &&
  215.                 req->cur[req->lvl].rq == req->cur[req->lvl].rqbase)
  216.         {
  217.             gossip_debug(GOSSIP_REQUEST_DEBUG,"\tbasic type or contiguous data\n");
  218.             contig_offset = req->cur[req->lvl].rq->offset +
  219.                     req->cur[req->lvl].chunk_offset + req->bytes +
  220.                     PINT_request_disp(req->cur[req->lvl].rq);
  221.             contig_size = (req->cur[req->lvl].maxel *
  222.                     req->cur[req->lvl].rq->aggregate_size) - req->bytes;
  223.             lvl_flag = 1;
  224.         }
  225.         /* subtype is contiguous because its size equals its extent */
  226.         /* AND the num_contig_chunks is 1 */
  227.         else if (req->cur[req->lvl].rq->ereq->aggregate_size ==
  228.                 (req->cur[req->lvl].rq->ereq->ub -
  229.                 req->cur[req->lvl].rq->ereq->lb) &&
  230.                 req->cur[req->lvl].rq->ereq->num_contig_chunks == 1)
  231.         {
  232.             gossip_debug(GOSSIP_REQUEST_DEBUG,"\tsubtype is contiguous\n");
  233.             contig_offset = req->cur[req->lvl].chunk_offset +
  234.                 (req->cur[req->lvl].el * (req->cur[req->lvl].rqbase->ub -
  235.                               req->cur[req->lvl].rqbase->lb)) +
  236.                 req->cur[req->lvl].rq->offset + (req->cur[req->lvl].rq->stride *
  237.                         req->cur[req->lvl].blk) + req->bytes +
  238.                 PINT_request_disp(req->cur[req->lvl].rq);
  239.             contig_size = (req->cur[req->lvl].rq->ereq->aggregate_size *
  240.                     req->cur[req->lvl].rq->num_ereqs) - req->bytes;
  241.             lvl_flag = 0;
  242.         }
  243.         /* go to the next level and "recurse" */
  244.         else
  245.         {
  246.             gossip_debug(GOSSIP_REQUEST_DEBUG,"\tgoing to next level %d\n",req->lvl+1);
  247.             if (!req->cur[req->lvl].rq->ereq ||
  248.                     req->lvl+1 >= req->cur[0].rqbase->depth)
  249.             {
  250.                 gossip_lerr("PINT_process_request exceeded request depth - possibly corrupted request or request state\n");
  251.                 return -PVFS_EINVAL;
  252.             }
  253.             req->cur[req->lvl+1].el = 0;
  254.             req->cur[req->lvl+1].maxel = req->cur[req->lvl].rq->num_ereqs;
  255.             req->cur[req->lvl+1].rq = req->cur[req->lvl].rq->ereq;
  256.             req->cur[req->lvl+1].rqbase = req->cur[req->lvl].rq->ereq;
  257.             req->cur[req->lvl+1].blk = 0;
  258.             req->cur[req->lvl+1].chunk_offset = req->cur[req->lvl].chunk_offset +
  259.                     (req->cur[req->lvl].el * (req->cur[req->lvl].rqbase->ub -
  260.                     req->cur[req->lvl].rqbase->lb)) + req->cur[req->lvl].rq->offset +
  261.                     (req->cur[req->lvl].rq->stride * req->cur[req->lvl].blk);
  262.             req->lvl++;
  263.             continue;
  264.         }
  265.         gossip_debug(GOSSIP_REQUEST_DEBUG,
  266.                 "\tcontig_offset = %lld contig_size = %lld lvl_flag = %d\n",
  267.                 lld(contig_offset), lld(contig_size), lvl_flag);
  268.         /* set this up for client processing */
  269.         if (PINT_IS_CLIENT(mode))
  270.         {
  271.             /* The type_offset of the mem type and the req type should
  272.              * track each other as the request is processed on the client
  273.              * The value of the offset_array is used to set the mem target_offset
  274.              * in the distribute routine, so we set it here to the type_offset of
  275.              * the req - WBL
  276.              */
  277.             result->offset_array[result->segs] = req->type_offset - req->target_offset;
  278.         }
  279.         /*** BEFORE CALLING DISTRIBUTE ***/
  280.         if (PINT_IS_LOGICAL_SKIP(mode))
  281.         {
  282.             gossip_debug(GOSSIP_REQUEST_DEBUG,"\tprocess logical skip\n");
  283.             if (req->type_offset + contig_size >= req->target_offset)
  284.             {
  285.                 /* this contig chunk will exceed the target start offset */
  286.                 retval = req->target_offset - req->type_offset;
  287.             }
  288.             else
  289.             {
  290.                 /* need to skip this whole block */
  291.                 retval = contig_size;
  292.             }
  293.             /* does this need to be here - or should it be elsewhere */
  294.             req->eof_flag = (rfdata->fsize <= req->type_offset) &&
  295.                 !(rfdata->extend_flag);
  296.         }
  297.         /*** CALLING DISTRIBUTE - OR WHATEVER ***/
  298.         else /* not logical skip or seeking */
  299.         {
  300.             PVFS_size sz = contig_size; /* don't modify contig_size here */
  301.             /* stop at final offset */
  302.             if (req->type_offset + sz > req->final_offset)
  303.             {
  304.                 sz = req->final_offset - req->type_offset;
  305.             }
  306.             /* memreq mode doesn't do distribution */
  307.             if (PINT_IS_MEMREQ(mode))
  308.             {
  309.                 /* check for too many bytes or segs */
  310.                 if (result->bytes + sz >= result->bytemax )
  311.                 {
  312.                     sz = result->bytemax - result->bytes;
  313.                 }
  314.                 PINT_ADD_SEGMENT(result, contig_offset, sz, mode);
  315.                 retval = sz;
  316.             }
  317.             else
  318.             {
  319.                 /* we process the whole thing at once */
  320.                 gossip_debug(GOSSIP_REQUEST_DEBUG,
  321.                                              "\tcalling distribute\n");
  322.                 retval = PINT_distribute(contig_offset, sz,
  323.                                                          rfdata, mem, result,
  324.                                                          &req->eof_flag, mode);
  325.  
  326.                                 if (-1 == retval)
  327.                                 {
  328.                                     gossip_debug(GOSSIP_REQUEST_DEBUG,
  329.                                                  "\tDistribute returned -1\n");
  330.                                     req->type_offset = req->final_offset;
  331.                                     result->segs = 0;
  332.                                     result->bytes = 0;
  333.                                     return 0;
  334.                                 }
  335.             }
  336.         }
  337.         /*** AFTER CALLING DISTRIBUTE ***/
  338.         gossip_debug(GOSSIP_REQUEST_DEBUG,"\tretval = %lld\n", lld(retval));
  339.         req->type_offset += retval;
  340.         /* see if we processed all of the bytes expected */
  341.         if (retval != contig_size)
  342.         {
  343.             /* no so record the bytes processed */
  344.             req->bytes += retval;
  345.             if (PINT_IS_LOGICAL_SKIP(mode))
  346.             {
  347.                 /* now starting processing for real */
  348.                 PINT_CLR_LOGICAL_SKIP(mode);
  349.                 retval = 0; /* keeps the loop going */
  350.                 gossip_debug(GOSSIP_REQUEST_DEBUG,
  351.                         "\texiting logical skip because distribute indicates done\n");
  352.                 continue;
  353.             }
  354.             else
  355.             {
  356.                 /* all we can do for now get outta here */
  357.                 gossip_debug(GOSSIP_REQUEST_DEBUG,
  358.                         "\texiting distribute returned less than expected\n");
  359.                 break;
  360.             }
  361.         }
  362.         /* processed all bytes so continue on and return from level */
  363.         if (lvl_flag)
  364.         {
  365.             req->lvl--;
  366.         }
  367.  
  368.     return_from_level:
  369.         gossip_debug(GOSSIP_REQUEST_DEBUG,"\treturn from level %d\n",req->lvl);
  370.         retval = 0;
  371.         req->bytes = 0;
  372.         if (req->lvl < 0)
  373.         {
  374.             /* we have processed the entire request */
  375.             break;
  376.         }
  377.         /* go to the next block */
  378.         gossip_debug(GOSSIP_REQUEST_DEBUG,"\tgoing to next block\n");
  379.         req->cur[req->lvl].blk++;
  380.         if (req->cur[req->lvl].blk >= req->cur[req->lvl].rq->num_blocks)
  381.         {
  382.             /* that was the last block */
  383.             req->cur[req->lvl].blk = 0;
  384.             /* go to next item in sequence chain */
  385.             gossip_debug(GOSSIP_REQUEST_DEBUG,"\tgoing to next item in sequence chain\n");
  386.             req->cur[req->lvl].rq = req->cur[req->lvl].rq->sreq;
  387.             if (req->cur[req->lvl].rq == NULL)
  388.             {
  389.                 /* that was last item in sequence chain */
  390.                 req->cur[req->lvl].rq = req->cur[req->lvl].rqbase;
  391.                 /* go to next element in block of level above */
  392.                 gossip_debug(GOSSIP_REQUEST_DEBUG,
  393.                         "\tgoing to next element in block of level above\n");
  394.                 req->cur[req->lvl].el++;
  395.                 if (req->cur[req->lvl].el >= req->cur[req->lvl].maxel)
  396.                 {
  397.                     /* that was last element in block of level above */
  398.                     req->lvl--;
  399.                     /* go back up a level */
  400.                     goto return_from_level;
  401.                 }
  402.             }
  403.         }
  404.         /* check to see if we are finished */
  405.         if (result->bytes == result->bytemax ||
  406.                 (!PINT_IS_CKSIZE(mode) && (result->segs == result->segmax)))
  407.         {
  408.             gossip_debug(GOSSIP_REQUEST_DEBUG,"\tran out of segments or bytes\n");
  409.             break;
  410.         }
  411.         /* look for end of request */
  412.         if (req->type_offset >= req->final_offset)
  413.         {
  414.             gossip_debug(GOSSIP_REQUEST_DEBUG,"\tend of the request\n");
  415.             break;
  416.         }
  417.     } /* this is the end of the while loop */
  418.     gossip_debug(GOSSIP_REQUEST_DEBUG,"\tdone sg %d sm %d by %lld bm %lld ta %lld to %lld fo %lld eof %d\n",
  419.             result->segs, result->segmax, lld(result->bytes), lld(result->bytemax),
  420.             lld(req->target_offset), lld(req->type_offset), lld(req->final_offset),
  421.             req->eof_flag);
  422.     if (PINT_EQ_CKSIZE(mode)) /* must be exact here */
  423.     {
  424.         /* restore request state */
  425.         free(temp_space);
  426.     }
  427.     if (!PINT_IS_MEMREQ(mode))
  428.         gossip_debug(GOSSIP_REQUEST_DEBUG,
  429.             "=========================================================\n");
  430.     return 0;
  431. }
  432.  
  433. /* this function runs down the ereq list and adds up the offsets */
  434. /* present in the request records */
  435. static PVFS_offset PINT_request_disp(PINT_Request *request)
  436. {
  437.     PVFS_offset disp = 0;
  438.     PINT_Request *r;
  439.     gossip_debug(GOSSIP_REQUEST_DEBUG,"\tRequest disp\n");
  440.     for (r = request->ereq; r; r = r->ereq)
  441.     {
  442.         disp += r->offset;
  443.     }
  444.     return disp;
  445. }
  446.  
  447. /* This function creates a request state and sets it up to begin */
  448. /* processing a request */
  449. struct PINT_Request_state *PINT_new_request_state(PINT_Request *request)
  450. {
  451.     return PINT_new_request_states(request, 1);
  452. }
  453.  
  454. struct PINT_Request_state *PINT_new_request_states(PINT_Request *request, int n)
  455. {
  456.     struct PINT_Request_state *reqs;
  457.     int rqdepth, i;
  458.  
  459.     gossip_debug(GOSSIP_REQUEST_DEBUG, "%s n=%d\n", __func__, n);
  460.  
  461.     /* we assume null request is a contiguous byte range depth 1 */
  462.     if (request)
  463.     {
  464.         rqdepth = request->depth;
  465.     }
  466.     else
  467.     {
  468.         rqdepth = 1;
  469.     }
  470.  
  471.     reqs = malloc(n * (sizeof(*reqs) + rqdepth * sizeof(*reqs->cur)));
  472.     if (!reqs)
  473.     {
  474.         gossip_lerr("%s: malloc failed\n", __func__);
  475.         return NULL;
  476.     }
  477.  
  478.     for (i=0; i<n; i++)
  479.     {
  480.         reqs[i].cur = (void *) &reqs[n];
  481.         reqs[i].cur += i * rqdepth;
  482.  
  483.         reqs[i].lvl = 0;
  484.         reqs[i].bytes = 0;
  485.         reqs[i].type_offset = 0;
  486.         reqs[i].target_offset = 0;
  487.         reqs[i].final_offset = request->aggregate_size;
  488.         reqs[i].eof_flag = 0;
  489.  
  490.         reqs[i].cur[0].maxel = 1; /* transfer one instance of request */
  491.         reqs[i].cur[0].el = 0;
  492.         reqs[i].cur[0].rq = request;
  493.         reqs[i].cur[0].rqbase = request;
  494.         reqs[i].cur[0].blk = 0;
  495.         reqs[i].cur[0].chunk_offset = 0; /* transfer from inital file offset */
  496.     }
  497.  
  498.     return reqs;
  499. }
  500.  
  501. /* This function frees request state structures */
  502. void PINT_free_request_state(PINT_Request_state *req)
  503. {
  504.     free(req);
  505. }
  506.  
  507. void PINT_free_request_states(PINT_Request_state *reqs)
  508. {
  509.     free(reqs);
  510. }
  511.  
  512. /**
  513.  * Returns:
  514.  *     - If the distribute finds file data on the server, then the byte
  515.  *       displacement from the input argument offset to the last byte
  516.  *       in the segment processed regardless of whether that byte is in
  517.  *       the current distribution or not
  518.  *     - -1 if there is no distribution data available on the server
  519.  * Inputs:
  520.  *     - offset and size are the contiguous region in logical file
  521.  *     space we are to process
  522.  *     - segmax and segs are the maximum segments we can create and
  523.  *     the number created so far
  524.  *     - bytemax and bytes are the maximum number of bytes we can
  525.  *     process and the number processed so far
  526.  *     - offset array and size array are where we output segments
  527.  *     - extend flags indicates we should proceed past EOF
  528.  * Outputs:
  529.  *     - updates index, bytes, offset_array, size_array
  530.  *     - sets eof_flag if at end of file
  531.  *     - returns logical file space offset differential of last
  532.  *     byte processed
  533.  * When client flag is set
  534.  *     segment offsets are computed based on buffer offset in
  535.  *     offset_array[*segs]
  536.  */
  537. PVFS_size PINT_distribute(PVFS_offset offset,
  538.                           PVFS_size size,
  539.                           PINT_request_file_data *rfdata,
  540.                           PINT_Request_state *mem,
  541.                           PINT_Request_result *result,
  542.                           PVFS_boolean *eof_flag,
  543.                           int mode)
  544. {
  545.     PVFS_offset orig_offset;
  546.     PVFS_size   orig_size;
  547.     PVFS_offset loff;    /* next logical offset within requested region */
  548.     PVFS_offset diff;    /* difference between loff and offset of region */
  549.     PVFS_offset poff;    /* physical offste corresponding to loff */
  550.     PVFS_size   sz;      /* number of bytes in requested region after loff */
  551.     PVFS_size   fraglen; /* length of physical strip contiguous on server */
  552.     PVFS_size   retval;
  553.  
  554.     gossip_debug(GOSSIP_REQUEST_DEBUG,"\tPINT_distribute\n");
  555.     gossip_debug(GOSSIP_REQUEST_DEBUG,
  556.                  "\t\tof %lld sz %lld ix %d sm %d by %lld bm %lld "
  557.                  "fsz %lld exfl %d\n",
  558.                  lld(offset), lld(size), result->segs, result->segmax,
  559.                  lld(result->bytes),
  560.                  lld(result->bytemax),
  561.                  lld(rfdata->fsize), rfdata->extend_flag);
  562.     orig_offset = offset;
  563.     orig_size = size;
  564.     *eof_flag = 0;
  565.     
  566.     /* check if we have maxed out result */
  567.     if ((!PINT_IS_CKSIZE(mode) && (result->segs >= result->segmax)) ||
  568.         result->bytes >= result->bytemax || size == 0)
  569.     {
  570.         /* not an error, but we didn't process any bytes */
  571.         gossip_debug(GOSSIP_REQUEST_DEBUG,
  572.                      "\t\trequested zero segs or zero bytes\n");
  573.         return 0;
  574.     }
  575.     
  576.     /* verify some critical pointers */
  577.     if (!rfdata || !rfdata->dist || !rfdata->dist->methods ||
  578.         !rfdata->dist->params)
  579.     {
  580.         if (!rfdata)
  581.             gossip_debug(GOSSIP_REQUEST_DEBUG,"rfdata is NULL\n");
  582.         else if (!rfdata->dist)
  583.             gossip_debug(GOSSIP_REQUEST_DEBUG,"rfdata->dist is NULL\n");
  584.         else if (!rfdata->dist->methods)
  585.             gossip_debug(GOSSIP_REQUEST_DEBUG,"rfdata->dist->methods is NULL\n");
  586.         else if (!rfdata->dist->params)
  587.             gossip_debug(GOSSIP_REQUEST_DEBUG,"rfdata->dist->params is NULL\n");
  588.         gossip_lerr("Bad Distribution! Bailing out!\n");
  589.         return 0;
  590.     }
  591.     
  592.     /* find next logical offset on this server */
  593.     loff = (*rfdata->dist->methods->next_mapped_offset)(rfdata->dist->params,
  594.                                                         rfdata,
  595.                                                         offset);
  596.  
  597.     /* If there is no data on this server, immediately return */
  598.     if (-1 == loff)
  599.     {
  600.         gossip_debug(GOSSIP_REQUEST_DEBUG,"\t\treturn, dist says no data\n");
  601.         return -1;
  602.     }
  603.     
  604.     /* make sure loff is still within requested region */
  605.     while ((diff = loff - offset) < size)
  606.     {
  607.         gossip_debug(GOSSIP_REQUEST_DEBUG,"\t\tbegin iteration loff: %lld\n",
  608.                      lld(loff));
  609.         
  610.         /* find physical offset for this loff */
  611.         poff = (*rfdata->dist->methods->logical_to_physical_offset)
  612.             (rfdata->dist->params,
  613.              rfdata,
  614.              loff);
  615.         
  616.         /* find how much of requested region remains after loff */
  617.         sz = size - diff;
  618.         
  619.         /* find how much data after loff/poff is on this server */
  620.         fraglen = (*rfdata->dist->methods->contiguous_length)
  621.             (rfdata->dist->params,
  622.              rfdata,
  623.              poff);
  624.         
  625.         /* compare that amount to amount of data in requested region */
  626.         if (sz > fraglen && rfdata->server_ct != 1)
  627.         {
  628.             /* frag extends beyond this strip */
  629.             gossip_debug(GOSSIP_REQUEST_DEBUG,"\t\tfrag extends beyond strip\n");
  630.             sz = fraglen;
  631.         }
  632.         /* check to see if exceeds bytemax */
  633.         if (result->bytes + sz > result->bytemax)
  634.         {
  635.             /* contiguous segment extends beyond byte limit */
  636.             gossip_debug(GOSSIP_REQUEST_DEBUG,"\t\tsegment exceeds byte limit\n");
  637.             sz = result->bytemax - result->bytes;
  638.         }
  639.         /* check to se if exceeds end of file */
  640.         if (poff+sz > rfdata->fsize)
  641.         {
  642.             /* check for append */
  643.             if (rfdata->extend_flag)
  644.             {
  645.              /* update the file size info */
  646.                 gossip_debug(GOSSIP_REQUEST_DEBUG,"\t\tfile being extended\n");
  647.                 rfdata->fsize = poff + sz;
  648.             }
  649.             else
  650.             {
  651.                 /* hit end of file */
  652.                 gossip_debug(GOSSIP_REQUEST_DEBUG,
  653.                              "\t\thit end of file: po %lld sz %lld fsz %lld\n",
  654.                              lld(poff), lld(sz), lld(rfdata->fsize));
  655.                 *eof_flag = 1;
  656.                 sz = rfdata->fsize - poff;
  657.                 if (sz <= 0)
  658.                 {
  659.                     /* not even any more bytes before EOF */
  660.                     gossip_debug(GOSSIP_REQUEST_DEBUG,
  661.                                  "\t\tend of file and no more bytes\n");
  662.                     break;
  663.                 }
  664.             }
  665.         }
  666.         /* process a segment */
  667.         if (PINT_IS_CLIENT(mode))
  668.         {
  669.             poff = result->offset_array[result->segs] + diff;
  670.             gossip_debug(GOSSIP_REQUEST_DEBUG,
  671.                          "\t\tclient lstof %lld diff %lld sgof %lld\n",
  672.                          lld(result->offset_array[result->segs]), lld(diff),
  673.                          lld(poff));
  674.         }
  675.         /* else poff is the offset of the segment */
  676.         if (PINT_IS_CLIENT(mode) && mem)
  677.         {
  678.             int current_segs = result->segs;
  679.  
  680.             gossip_debug(GOSSIP_REQUEST_DEBUG,
  681.                          "**********CALL***PROCESS*********\n");
  682.             gossip_debug(GOSSIP_REQUEST_DEBUG,"\t\tsegment of %lld sz %lld\n",
  683.                          lld(poff), lld(sz));
  684.  
  685.             /* call request processor to decode request type */
  686.             /* sequential offset is offset_array[*segs] */
  687.             /* size is sz */
  688.             PINT_REQUEST_STATE_SET_TARGET(mem, poff);
  689.             PINT_REQUEST_STATE_SET_FINAL(mem, poff + sz);
  690.             PINT_process_request(mem, NULL, rfdata, result, mode|PINT_MEMREQ);
  691.             sz = mem->type_offset - poff;
  692.             
  693.             if(sz <= 0 && result->segs == current_segs)
  694.             {
  695.                 /* If there no new segments within the memory request, 
  696.                  * we don't need to post-process
  697.                  */
  698.                 break;
  699.             }
  700.             gossip_debug(GOSSIP_REQUEST_DEBUG,
  701.                          "*****RETURN***FROM***PROCESS*****\n");
  702.         }
  703.         else
  704.         {
  705.             PINT_ADD_SEGMENT(result, poff, sz, mode);
  706.         }
  707.         /* this is used by client code */
  708.         if (PINT_IS_CLIENT(mode) && result->segs < result->segmax)
  709.         {
  710.             result->offset_array[result->segs] =
  711.                 result->offset_array[result->segs - 1] + 
  712.                 result->size_array[result->segs - 1];
  713.         }
  714.         /* sz should never be zero or negative */
  715.         if (sz < 1)
  716.         {
  717.             gossip_lerr("Error in distribution processing!\n");
  718.             break;
  719.         }
  720.         /* prepare for next iteration */
  721.         loff  += sz;
  722.         size  -= loff - offset;
  723.         offset = loff;
  724.         /* find next logical offset on this server */
  725.         loff = (*rfdata->dist->methods->next_mapped_offset)
  726.             (rfdata->dist->params,
  727.              rfdata,
  728.              offset);
  729.         assert(-1 != loff);
  730.         
  731.         gossip_debug(GOSSIP_REQUEST_DEBUG,"\t\tend iteration\n");
  732.         /* see if we are finished */
  733.         if (result->bytes >= result->bytemax ||
  734.             (!PINT_IS_CKSIZE(mode) && (result->segs >= result->segmax)))
  735.         {
  736.             gossip_debug(GOSSIP_REQUEST_DEBUG,
  737.                          "\t\tdone with segments or bytes\n");
  738.             break;
  739.         }
  740.     }
  741.     
  742.     gossip_debug(GOSSIP_REQUEST_DEBUG,
  743.                  "\t\t\tof %lld sz %lld sg %d sm %d by %lld bm %lld\n",
  744.                  lld(offset), lld(size), result->segs, result->segmax,
  745.                  lld(result->bytes),
  746.                  lld(result->bytemax));
  747.     
  748.     /* find physical offset for this loff */
  749.     poff = (*rfdata->dist->methods->logical_to_physical_offset)
  750.         (rfdata->dist->params, rfdata, loff);
  751.     
  752.     gossip_debug(GOSSIP_REQUEST_DEBUG,
  753.                  "\t\t\tnext loff: %lld next poff: %lld\n",
  754.                  lld(loff), lld(poff));
  755.     
  756.     if (poff >= rfdata->fsize && !rfdata->extend_flag)
  757.     {
  758.         /* end of file - thus end of request */
  759.         *eof_flag = 1;
  760.         gossip_debug(GOSSIP_REQUEST_DEBUG,"\t\t\t[return value] %lld (EOF)\n",
  761.                      lld(orig_size));
  762.         retval = orig_size;
  763.     }
  764.     if (loff >= orig_offset + orig_size)
  765.     {
  766.         gossip_debug(GOSSIP_REQUEST_DEBUG,
  767.                      "\t\t\t(return value) %lld%s\n", lld(orig_size),
  768.                      *eof_flag ? " (EOF)" : "");
  769.         retval = orig_size;
  770.     }
  771.     else
  772.     {
  773.         gossip_debug(GOSSIP_REQUEST_DEBUG,"\t\t\treturn value %lld%s\n",
  774.                      lld(offset - orig_offset), *eof_flag ? " (EOF)" : "");
  775.         retval = (offset - orig_offset);
  776.     }
  777.     gossip_debug(GOSSIP_REQUEST_DEBUG,"\t\tfinished\n");
  778.  
  779.     return retval;
  780. }
  781.  
  782. /* Function: PINT_Request_commit
  783.  * Objective: Write out the request tree to a contiguous
  784.  * region - return the offset of the next empty space in region
  785.  */
  786. /* Traverse the binary tree, pick up each request struct, modify its type
  787.  * and set the offsets in the ereq and sreq of the request struct. These
  788.  * offsets must reflect the current offsets and then write each struct 
  789.  * to the contiguous memory region
  790.  */
  791. int PINT_request_commit(PINT_Request *region, PINT_Request *node)
  792. {
  793.     int32_t index = 0;
  794.     PINT_do_request_commit(region, node, &index, 0);
  795.     return 0;
  796. }
  797.  
  798. int PINT_do_clear_commit(PINT_Request *node, int32_t depth)
  799. {
  800.     if (node == NULL)
  801.         return -1;
  802.  
  803.     if (!node->committed && depth > 0)
  804.         return 0;
  805.     
  806.     node->committed = 0;
  807.  
  808.     if (node->ereq)
  809.     {
  810.         PINT_do_clear_commit(node->ereq, depth+1);
  811.     }
  812.     if (node->sreq)
  813.     {
  814.         PINT_do_clear_commit(node->sreq, depth+1);
  815.     }
  816.     return 0;
  817. }
  818.  
  819. PINT_Request *PINT_do_request_commit(PINT_Request *region, PINT_Request *node,
  820.         int32_t *index, int32_t depth)
  821. {
  822.     int node_was_committed = 0;
  823.  
  824.     /* Leaf Node? */
  825.     if(node == NULL)
  826.         return NULL;
  827.   
  828.     gossip_debug(GOSSIP_REQUEST_DEBUG,"%s: commit node %p\n", __func__, node);
  829.  
  830.     /* catches any previously packed structures */
  831.     if (node->committed == -1)
  832.     {
  833.         node->committed = 0;
  834.         node_was_committed = 1;
  835.     }
  836.  
  837.     /* this node was previously committed */
  838.     if (node->committed)
  839.     {
  840.         gossip_debug(GOSSIP_REQUEST_DEBUG,"previously commited %d\n", node->committed);
  841.         return ®ion[node->committed]; /* should contain the index */
  842.     }
  843.  
  844.     /* Copy node to contiguous region */
  845.     gossip_debug(GOSSIP_REQUEST_DEBUG,"node stored at %d\n", *index);
  846.     memcpy(®ion[*index], node, sizeof(struct PINT_Request));
  847.     node->committed = *index;
  848.     *index = *index + 1;
  849.  
  850.     /* Update ereq so that the relative positions are maintained */
  851.     if (node->ereq)
  852.     {
  853.         region[node->committed].ereq =
  854.                 PINT_do_request_commit(region, node->ereq, index, depth+1);
  855.     }
  856.     else
  857.     {
  858.         region[node->committed].ereq = NULL;
  859.     }
  860.  
  861.     /* Update sreq so that the relative positions are maintained */
  862.     if (node->sreq)
  863.     {
  864.         region[node->committed].sreq =
  865.                 PINT_do_request_commit(region, node->sreq, index, depth+1);
  866.     }
  867.     else
  868.     {
  869.         region[node->committed].sreq = NULL;
  870.     }
  871.  
  872.     if (depth == 0)
  873.     {
  874.         gossip_debug(GOSSIP_REQUEST_DEBUG,"clearing tree\n");
  875.         PINT_do_clear_commit(node, 0);
  876.         /* this indicates the region is packed */
  877.         region->committed = -1;
  878.         /* if the original request was committed, this restores that */
  879.         if (node_was_committed)
  880.         {
  881.             node->committed = -1;
  882.         }
  883.     }
  884.  
  885.     /* Return the index of the committed struct */ 
  886.     return ®ion[node->committed]; 
  887. }
  888.  
  889. /* This function converts pointers to array indexes for transport
  890.  * The input Request MUST be committed
  891.  */
  892. int PINT_request_encode(struct PINT_Request *req)
  893. {
  894.     int r;
  895.     if (!PINT_REQUEST_IS_PACKED(req))
  896.         return -1;
  897.     for (r = 0; r <= PINT_REQUEST_NEST_SIZE(req); r++)
  898.     {
  899.         if (req[r].ereq)
  900.             req[r].ereq = (PINT_Request *) (req[r].ereq - &req[0]);
  901.         else
  902.             req[r].ereq = (PINT_Request *) -1;
  903.         if (req[r].sreq)
  904.             req[r].sreq = (PINT_Request *) (req[r].sreq - &req[0]);
  905.         else
  906.             req[r].sreq = (PINT_Request *) -1;
  907.     }
  908.     return 0;
  909. }
  910.  
  911. /* This function coverts array indexes back to pointers after transport
  912.  * The input Request MUST be committed
  913.  */
  914. int PINT_request_decode(struct PINT_Request *req)
  915. {
  916.     int r;
  917.     if (!PINT_REQUEST_IS_PACKED(req))
  918.         return -1;
  919.     for (r = 0; r <= PINT_REQUEST_NEST_SIZE(req); r++)
  920.     {
  921.         /* type must match the encoding type in encode_PVFS_Request */
  922.         if ((u_int32_t)(intptr_t) req[r].ereq == (u_int32_t) -1)
  923.             req[r].ereq = 0;
  924.         else
  925.             req[r].ereq = &req[0] + (unsigned long) req[r].ereq;
  926.         if ((u_int32_t)(intptr_t) req[r].sreq == (u_int32_t) -1)
  927.             req[r].sreq = 0;
  928.         else
  929.             req[r].sreq = &req[0] + (unsigned long) req[r].sreq;
  930.     }
  931.     return 0;
  932. }
  933.     
  934.  
  935. void PINT_dump_packed_request(PINT_Request *req)
  936. {
  937.     int i;
  938.     if (!PINT_REQUEST_IS_PACKED(req))
  939.         return;
  940.     for (i = 0; i < PINT_REQUEST_NEST_SIZE(req)+1; i++)
  941.     {
  942.         PINT_dump_request(req+i);
  943.     }
  944. }
  945.  
  946. void PINT_dump_request(PINT_Request *req)
  947. {
  948.     gossip_debug(GOSSIP_REQUEST_DEBUG,"**********************\n");
  949.     gossip_debug(GOSSIP_REQUEST_DEBUG,"address:\t%p\n",req);
  950.     gossip_debug(GOSSIP_REQUEST_DEBUG,"offset:\t\t%d\n",(int)req->offset);
  951.     gossip_debug(GOSSIP_REQUEST_DEBUG,"num_ereqs:\t%d\n",(int)req->num_ereqs);
  952.     gossip_debug(GOSSIP_REQUEST_DEBUG,"num_blocks:\t%d\n",(int)req->num_blocks);
  953.     gossip_debug(GOSSIP_REQUEST_DEBUG,"stride:\t\t%d\n",(int)req->stride);
  954.     gossip_debug(GOSSIP_REQUEST_DEBUG,"ub:\t\t%d\n",(int)req->ub);
  955.     gossip_debug(GOSSIP_REQUEST_DEBUG,"lb:\t\t%d\n",(int)req->lb);
  956.     gossip_debug(GOSSIP_REQUEST_DEBUG,"agg_size:\t%d\n",(int)req->aggregate_size);
  957.     gossip_debug(GOSSIP_REQUEST_DEBUG,"num_chunk:\t%d\n",(int)req->num_contig_chunks);
  958.     gossip_debug(GOSSIP_REQUEST_DEBUG,"depth:\t\t%d\n",(int)req->depth);
  959.     gossip_debug(GOSSIP_REQUEST_DEBUG,"num_nest:\t%d\n",(int)req->num_nested_req);
  960.     gossip_debug(GOSSIP_REQUEST_DEBUG,"commit:\t\t%d\n",(int)req->committed);
  961.     gossip_debug(GOSSIP_REQUEST_DEBUG,"refcount:\t\t%d\n",(int)req->refcount);
  962.     gossip_debug(GOSSIP_REQUEST_DEBUG,"ereq:\t\t%p\n",req->ereq);
  963.     gossip_debug(GOSSIP_REQUEST_DEBUG,"sreq:\t\t%p\n",req->sreq);
  964.     gossip_debug(GOSSIP_REQUEST_DEBUG,"**********************\n");
  965. }
  966.  
  967. /*
  968.  * Local variables:
  969.  *  mode: c
  970.  *  c-indent-level: 4
  971.  *  c-basic-offset: 4
  972.  * End:
  973.  *
  974.  * vim: ft=c ts=4 sts=4 sw=4 expandtab
  975.  */
  976.