home *** CD-ROM | disk | FTP | other *** search
/ ftp.parl.clemson.edu / 2015-02-07.ftp.parl.clemson.edu.tar / ftp.parl.clemson.edu / pub / pvfs2 / orangefs-2.8.3-20110323.tar.gz / orangefs-2.8.3-20110323.tar / orangefs / src / server / create-immutable-copies.sm < prev    next >
Text File  |  2010-08-31  |  78KB  |  2,121 lines

  1. /* 
  2.  * (C) 2001 Clemson University and The University of Chicago 
  3.  *
  4.  * See COPYING in top-level directory.
  5.  */
  6.  
  7. /* adding a comment */
  8.  
  9. #include <string.h>
  10. #include <assert.h>
  11.  
  12. #include "server-config.h"
  13. #include "pvfs2-server.h"
  14. #include "pvfs2-attr.h"
  15. #include "pvfs2-internal.h"
  16. #include "pvfs2-util.h"
  17. #include "pint-util.h"
  18. #include "pint-eattr.h"
  19. #include "pint-cached-config.h"
  20. #include "pvfs2-dist-basic.h"
  21. #include "pvfs2-mirror.h"
  22.  
  23. /*Global Variables*/
  24.  
  25. /*all bits turned on*/
  26. static uint64_t UINT64_HIGH = 0xffffffffffffffffULL; 
  27.  
  28. /*attribute keys used for the mirroring process*/
  29. static char handle_key[]     = USER_PVFS2_MIRROR_HANDLES;
  30. static char copy_count_key[] = USER_PVFS2_MIRROR_COPIES;
  31. static char status_key[]     = USER_PVFS2_MIRROR_STATUS;
  32. static char mode_key[]       = USER_PVFS2_MIRROR_MODE;
  33.  
  34. enum {
  35.    LOCAL_HANDLES = 100,
  36.    REMOTE_HANDLES,
  37.    REPLACE_DONE,
  38.    LOCAL_SRC,
  39.    REMOTE_SRC,
  40.    RETRY,
  41.    NOTHING_TO_DO
  42. };
  43.  
  44.  
  45. #define SERVER_NAME_MAX   1024
  46. #define WRITE_RETRY_LIMIT 2
  47. #define DEFAULT_COPIES    1
  48.  
  49.  
  50. /*helper macros*/
  51.  
  52. /*Sets up a two dimensional array from a one dimensional array*/
  53. #define ONE_DIM_TO_TWO_DIMS(in,out,rows,cols,type) \
  54.   do {                                             \
  55.      int i;                                        \
  56.      type *p;                                      \
  57.      for (i=0,p=in; i<rows; i++,p+=cols)           \
  58.          out[i] = p;                               \
  59.   } while (0)
  60.  
  61.  
  62. /*prototypes*/
  63. static int mirror_comp_fn(
  64.            void *v_p
  65.           ,struct PVFS_server_resp *resp_p
  66.           ,int i);
  67.  
  68. static PVFS_handle *reorganize_copy_handles(
  69.            struct PINT_server_create_copies_op *imm_p);
  70.  
  71. static int get_server_names(PINT_server_create_copies_op *imm_p);
  72.  
  73.  
  74. /*start of state machine*/
  75. %%
  76.  
  77. nested machine pvfs2_create_immutable_copies_sm
  78. {
  79.    state initialize_structures
  80.     {
  81.         run initialize_structures;
  82.         success => obtain_source_info;
  83.         default => cleanup;
  84.     } 
  85.  
  86.    state obtain_source_info
  87.     {
  88.         run obtain_source_info;
  89.         success => inspect_source_info;
  90.         default => cleanup;
  91.     }
  92.  
  93.    state inspect_source_info
  94.     {
  95.         run inspect_source_info;
  96.         success => create_local_datahandles;
  97.         default => cleanup;
  98.     }
  99.  
  100.    state create_local_datahandles
  101.     {
  102.         run create_local_datahandles;
  103.         success => obtain_local_handle_sizes;
  104.         default => cleanup;
  105.     }
  106.  
  107.    state obtain_local_handle_sizes
  108.     {
  109.         run obtain_local_handle_sizes;
  110.         success => inspect_local_handle_sizes;
  111.         default => cleanup;
  112.     }
  113.  
  114.    state inspect_local_handle_sizes
  115.     {
  116.         run inspect_local_handle_sizes;
  117.         success => create_remote_datahandles;
  118.         default => cleanup;     
  119.     }
  120.  
  121.    state create_remote_datahandles
  122.     {
  123.         run create_remote_datahandles;
  124.         success => setup_datahandle_copies;
  125.         default => remove_local_datahandle_objects;
  126.     }
  127.  
  128.    state setup_datahandle_copies
  129.     {
  130.         run setup_datahandle_copies;
  131.         success => copy_data;
  132.         default => remove_local_datahandle_objects;
  133.     }
  134.  
  135.    state copy_data
  136.     {
  137.         pjmp copy_data
  138.         {
  139.            LOCAL_SRC  => pvfs2_pjmp_mirror_work_sm;
  140.            REMOTE_SRC => pvfs2_pjmp_call_msgpairarray_sm; 
  141.         }
  142.         success => check_copy_results;
  143.         default => cleanup;
  144.     }
  145.  
  146.    state check_copy_results
  147.     {
  148.         run check_copy_results;
  149.         success => store_mirror_info;
  150.         default => check_for_retries;
  151.     }
  152.  
  153.    state check_for_retries
  154.     {
  155.         run check_for_retries;
  156.         RETRY   => copy_data;
  157.         default => store_mirror_info;
  158.     }
  159.  
  160.    state store_mirror_info
  161.     {
  162.         run store_mirror_info;
  163.         /*default => replace_remote_datahandle_objects;*/
  164.         /*If the write of the datahandle information fails, even though the  */
  165.         /*the copies actually exist, the metadata for the logical file will  */
  166.         /*NOT have knowledge of it.                                          */
  167.         default => check_store_job;
  168.     }
  169.  
  170.    state check_store_job
  171.     {
  172.         run check_store_job;
  173.         default => cleanup;
  174.     }
  175.  
  176.    state replace_remote_datahandle_objects
  177.     {
  178.         run replace_remote_datahandle_objects;
  179.         REPLACE_DONE => remove_local_datahandle_objects;
  180.         default      => replace_remote_datahandle_objects;
  181.     }
  182.  
  183.    state remove_local_datahandle_objects
  184.     {
  185.         run remove_local_datahandle_objects;
  186.         default => cleanup;
  187.     }
  188.  
  189.    state cleanup
  190.     {
  191.         run cleanup;
  192.         default => return;
  193.     }
  194.  
  195. } /*end nested state machine pvfs2_create_immutable_copies_sm*/
  196. %%
  197.  
  198.  
  199. /************************************************************************/
  200. /*Actions for pvfs2_create_immutable_copies_sm                          */
  201. /************************************************************************/
  202. static PINT_sm_action initialize_structures (struct PINT_smcb *smcb
  203.                                             ,job_status_s *js_p)
  204. {
  205.    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing initialize_structures....\n");
  206.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tframe count is %d.\n",smcb->frame_count);
  207.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\t base frame is %d.\n",smcb->base_frame);
  208.  
  209.    struct PINT_server_op *s_op = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
  210.    PINT_server_create_copies_op *imm_p = &(s_op->u.create_copies);
  211.  
  212.    int ret;
  213.  
  214.    js_p->error_code = 0;
  215.  
  216.    /* These values are generated by default when the prelude executes.        */
  217.    /* When seteattr executes, the prelude retrieves the common metadata info. */
  218.    imm_p->dfile_count = s_op->target_object_attr->u.meta.dfile_count;
  219.    imm_p->metadata_handle = s_op->target_handle;
  220.    imm_p->fs_id = s_op->target_fs_id;
  221.  
  222.    /* Get the number of IO servers currently running for the given filesystem */
  223.    ret = PINT_cached_config_get_num_io(imm_p->fs_id,&(imm_p->io_servers_count));
  224.    if (ret)
  225.    {
  226.       js_p->error_code = ret;
  227.       return SM_ACTION_COMPLETE;
  228.    }
  229.  
  230.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tdfile_count: %d\tmetadata_handle: %llu"
  231.                                     "\tfs_id: %u"
  232.                                     "\tio_servers_count: %d\n"
  233.                                    ,imm_p->dfile_count
  234.                                    ,llu(imm_p->metadata_handle)
  235.                                    ,imm_p->fs_id
  236.                                    ,imm_p->io_servers_count );
  237.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tds_attr.b_size:%d\n"
  238.                                    ,(int)s_op->ds_attr.u.datafile.b_size);
  239.  
  240.  
  241.    return SM_ACTION_COMPLETE;
  242. } /*end action initialize_structures*/
  243.  
  244.  
  245.  
  246. static PINT_sm_action obtain_source_info (struct PINT_smcb *smcb
  247.                                          ,job_status_s *js_p)
  248. {
  249.  /*In this state, we are retrieving the data handles, the number of      */
  250.  /*desired copies, and the mirroring mode for the given meta data handle.*/
  251.  /*If the mirroring mode == NO_MIRRORING, then we will not perform the   */
  252.  /*mirror operation.                                                     */   
  253.    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing obtain_source_info....\n");
  254.  
  255.    struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
  256.    PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
  257.    int keyval_count = 3;
  258.    job_id_t job_id;
  259.    int ret = 0,i;
  260.  
  261.    js_p->error_code = 0;
  262.  
  263.    /*allocate space to retrieve attributes from trove.*/
  264.    sm_p->keyval_count = keyval_count;
  265.  
  266.    sm_p->key_a   = malloc(sizeof(*sm_p->key_a)   * sm_p->keyval_count);
  267.    sm_p->val_a   = malloc(sizeof(*sm_p->val_a)   * sm_p->keyval_count);
  268.    sm_p->error_a = malloc(sizeof(*sm_p->error_a) * sm_p->keyval_count);
  269.    if (!sm_p->key_a || !sm_p->val_a || !sm_p->error_a)
  270.       goto error_exit;
  271.   
  272.    memset(sm_p->key_a  , 0, sizeof(*sm_p->key_a)   * sm_p->keyval_count);
  273.    memset(sm_p->val_a  , 0, sizeof(*sm_p->val_a)   * sm_p->keyval_count);
  274.    memset(sm_p->error_a, 0, sizeof(*sm_p->error_a) * sm_p->keyval_count);
  275.  
  276.    /*setup key/val to retreive the mirroring mode*/
  277.    i=0; assert(i<keyval_count);
  278.    sm_p->key_a[i].buffer = mode_key;
  279.    sm_p->key_a[i].buffer_sz = sizeof(mode_key);
  280.  
  281.    sm_p->val_a[i].buffer = &(imm_p->mirror_mode);
  282.    sm_p->val_a[i].buffer_sz = sizeof(imm_p->mirror_mode);
  283.  
  284.  
  285.    /*setup key/val to retreive the datahandles*/
  286.    i++; assert(i<keyval_count);
  287.    sm_p->key_a[i].buffer    = Trove_Common_Keys[METAFILE_HANDLES_KEY].key;
  288.    sm_p->key_a[i].buffer_sz = Trove_Common_Keys[METAFILE_HANDLES_KEY].size;
  289.  
  290.    imm_p->handle_array_base = malloc(imm_p->dfile_count * sizeof(PVFS_handle));
  291.    if (!imm_p->handle_array_base)
  292.       goto error_exit;
  293.    sm_p->val_a[i].buffer    =  imm_p->handle_array_base;
  294.    sm_p->val_a[i].buffer_sz = (imm_p->dfile_count * sizeof(PVFS_handle));
  295.  
  296.    /*setup key/val to retreive the number of copies*/
  297.    i++; assert(i<keyval_count);
  298.    sm_p->key_a[i].buffer    = copy_count_key;
  299.    sm_p->key_a[i].buffer_sz = sizeof(copy_count_key);
  300.  
  301.    sm_p->val_a[i].buffer    = &(imm_p->copies);
  302.    sm_p->val_a[i].buffer_sz = sizeof(imm_p->copies);
  303.  
  304.  
  305. /***** don't need to get file's distriubtion information, because we are */
  306. /***** copying each datahandle, as is, directly into a new datahandle.   */
  307. /***** Distribution is only needed when you are modifying the logical    */
  308. /***** file, not the individual data handles.  However, we need to pro-  */
  309. /***** vide a distribution value for the IO request, even though it won't*/
  310. /***** be used.  So, instead of issuing a trove call to get the file's   */
  311. /***** distribution information, we will be using just the "basic" dis-  */
  312. /***** tribution.                                                        */
  313.  
  314.    imm_p->dist = malloc(sizeof(PINT_dist));
  315.    if (!imm_p->dist)
  316.       goto error_exit;
  317.    memset(imm_p->dist,0,sizeof(PINT_dist));
  318.  
  319.    imm_p->dist->dist_name = malloc(PVFS_DIST_BASIC_NAME_SIZE);
  320.    if (!imm_p->dist->dist_name)
  321.       goto error_exit;
  322.    strcpy(imm_p->dist->dist_name,PVFS_DIST_BASIC_NAME);
  323.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tdistribution name:%s\n"
  324.                                    ,imm_p->dist->dist_name);
  325.  
  326.    ret = PINT_dist_lookup(imm_p->dist);
  327.    if (ret)
  328.    {
  329.       gossip_lerr("Error looking up basic distribution:%d",ret);
  330.       js_p->error_code = ret;
  331.       goto error_exit;
  332.    }
  333.  
  334.    /*retrieve key/val pairs */
  335.    ret = job_trove_keyval_read_list( imm_p->fs_id
  336.                                     ,imm_p->metadata_handle
  337.                                     ,sm_p->key_a
  338.                                     ,sm_p->val_a
  339.                                     ,sm_p->error_a
  340.                                     ,sm_p->keyval_count
  341.                                     ,0
  342.                                     ,NULL
  343.                                     ,smcb
  344.                                     ,0
  345.                                     ,js_p
  346.                                     ,&job_id
  347.                                     ,server_job_context
  348.                                     ,NULL );
  349.     return (ret);
  350.  
  351. error_exit:
  352.    if (sm_p->key_a)
  353.       free (sm_p->key_a);
  354.    if (sm_p->val_a)
  355.       free (sm_p->val_a);
  356.    if (sm_p->error_a)
  357.       free(sm_p->error_a);
  358.    sm_p->key_a   = sm_p->val_a = NULL;
  359.    sm_p->error_a = NULL;
  360.  
  361.    if (imm_p->dist && imm_p->dist->dist_name)
  362.       free(imm_p->dist->dist_name);
  363.    if (imm_p->dist)
  364.       free(imm_p->dist);
  365.    imm_p->dist = NULL;
  366.  
  367.    if (js_p->error_code == 0)
  368.       js_p->error_code = -PVFS_ENOMEM;
  369.    return SM_ACTION_COMPLETE;
  370. }/*end action obtain_source_info*/
  371.  
  372.  
  373.  
  374.  
  375.  
  376. static PINT_sm_action inspect_source_info (struct PINT_smcb *smcb
  377.                                           ,job_status_s *js_p)
  378. {   
  379.    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing inspect_source_info....\n");
  380.  
  381.    struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
  382.    PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
  383.    char server_name[SERVER_NAME_MAX] = {0};
  384.    server_configuration_s *config = get_server_config_struct();
  385.    int ret = 0;
  386.    int i,j;
  387.  
  388.    /*check error codes from previous trove read-list call. */
  389.    for (i=0; i<sm_p->keyval_count; i++)
  390.    {
  391.       /*if the mirroring mode has no entry, the mode=NO_MIRRORING, or the mode*/
  392.       /*is not the expected mode, then there is nothing to do.                */
  393.       if (sm_p->key_a[i].buffer == mode_key)
  394.       {
  395.           if (PVFS_get_errno_mapping(sm_p->error_a[i]) == ENOENT)
  396.           {
  397.                  js_p->error_code = NOTHING_TO_DO;
  398.                  goto error_exit;
  399.           }
  400.           imm_p->mirror_mode = *(MIRROR_MODE *)sm_p->val_a[i].buffer;
  401.           gossip_debug(GOSSIP_MIRROR_DEBUG,"\tRetrieved mirroring mode is %d.\n"
  402.                                           ,imm_p->mirror_mode);
  403.           if (imm_p->mirror_mode == NO_MIRRORING ||
  404.               imm_p->mirror_mode != imm_p->expected_mirror_mode)
  405.           {
  406.              js_p->error_code = NOTHING_TO_DO;
  407.              goto error_exit;
  408.           }
  409.       } 
  410.  
  411.       /*if the user hasn't set the number of copies, this code will */
  412.       /*set a default (currently = 1).                              */
  413.       if (sm_p->key_a[i].buffer == copy_count_key)
  414.       { 
  415.          if (PVFS_get_errno_mapping(sm_p->error_a[i]) == ENOENT)
  416.          {
  417.             gossip_lerr("User-defined number of copies not found. "
  418.                         "Defaulting number of copies to %d.\n"
  419.                        ,DEFAULT_COPIES);
  420.             imm_p->copies = DEFAULT_COPIES;
  421.             continue; 
  422.          }
  423.       }
  424.  
  425.       /*check for other types of errors.*/
  426.       if (sm_p->error_a[i])
  427.       {
  428.           gossip_lerr("Error retrieving value for '%s' : %s\n"
  429.                      ,(char *)sm_p->key_a[i].buffer
  430.                      ,strerror(PVFS_get_errno_mapping(-sm_p->error_a[i])));
  431.           js_p->error_code = sm_p->error_a[i];
  432.           goto error_exit;
  433.       }
  434.    }/*end for*/
  435.  
  436.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tRetrieved # of copies:%d\n"
  437.                                    ,imm_p->copies);
  438.  
  439.    /*If there is only one server running, then it makes no sense to create */
  440.    /*copies on the same server.                                            */
  441.    if (imm_p->io_servers_count == 1)
  442.    {
  443.        gossip_lerr("Mirroring operation is not permitted when only one "
  444.                    "I/O server is running.\n");
  445.        js_p->error_code = -PVFS_EPERM;
  446.        return SM_ACTION_COMPLETE;
  447.    }
  448.  
  449.    /* We need at least (# of copies) + 1 I/O servers running in the system to */
  450.    /* prevent duplicate data on any one server, while not exceeding the number*/
  451.    /* of I/O servers in the system. If the number of copies requested by the  */
  452.    /* user is >= the number of I/O servers in the system, then we lower the   */
  453.    /* number of requested copies.  We then set the number of I/O servers      */
  454.    /* required to meet this request with the (new value of copies) + 1.       */
  455.    /* At this point, if the number of I/O servers required is less than the   */
  456.    /* number of servers in this file's distribution, then set the number of   */
  457.    /* required I/O servers to the same number of servers in this file's dis-  */
  458.    /* tribution.                                                              */
  459.    if (imm_p->copies >= imm_p->io_servers_count)
  460.        imm_p->copies = imm_p->io_servers_count - 1;
  461.  
  462.    imm_p->io_servers_required = imm_p->copies + 1;
  463.  
  464.    if (imm_p->io_servers_required < imm_p->dfile_count)
  465.        imm_p->io_servers_required = imm_p->dfile_count;
  466.  
  467.    /*allocate space for io_servers array.  this array will contain the server */
  468.    /*names which will be used as the valid destination remotes for the copies.*/
  469.    imm_p->io_servers = malloc( imm_p->io_servers_required * sizeof(char *) );
  470.    if ( !imm_p->io_servers )
  471.    {
  472.        js_p->error_code = -PVFS_ENOMEM;
  473.        goto error_exit;
  474.    }
  475.    memset(imm_p->io_servers,0,imm_p->io_servers_required * sizeof(char *));
  476.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tAllocated char *:\n");
  477.    for (i=0; i<imm_p->io_servers_required; i++)
  478.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\t  io_servers[%d] : %p "
  479.                                         "\t &io_servers[%d] : %p\n"
  480.                                        ,i,imm_p->io_servers[i]
  481.                                        ,i,&(imm_p->io_servers[i])); 
  482.    imm_p->num_io_servers = imm_p->io_servers_required;
  483.    for (i=0;i<imm_p->io_servers_required;i++)
  484.    {
  485.        imm_p->io_servers[i] = malloc( sizeof(server_name) );
  486.        if ( !imm_p->io_servers[i] )
  487.        {
  488.            js_p->error_code = -PVFS_ENOMEM;
  489.            goto error_exit;
  490.        }
  491.        memset(imm_p->io_servers[i],0,sizeof(server_name));
  492.    }
  493.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tAllocated server_name..\n");
  494.    for (i=0; i<imm_p->io_servers_required; i++)
  495.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\t  io_servers[%d] : %p "
  496.                                         "\t *io_servers[%d] : %s "
  497.                                         "\t &io_servers[%d] : %p\n"
  498.                                        ,i,imm_p->io_servers[i]
  499.                                        ,i,imm_p->io_servers[i]
  500.                                        ,i,&(imm_p->io_servers[i])); 
  501.  
  502.  
  503.  
  504.    /*allocate space for the local source handles*/
  505.    imm_p->handle_array_base_local = 
  506.        malloc(imm_p->dfile_count * sizeof(PVFS_handle));
  507.    if (!imm_p->handle_array_base_local)
  508.    {
  509.       js_p->error_code = -PVFS_ENOMEM;
  510.       goto error_exit;
  511.    }
  512.    memset(imm_p->handle_array_base_local
  513.          ,0
  514.          ,imm_p->dfile_count * sizeof(PVFS_handle));
  515.    imm_p->handle_array_base_local_count = 0;
  516.  
  517.  
  518.    /*allocate space for the data handle copies array, which will hold a       */
  519.    /*combination of local and remote data handles used as destination handles */
  520.    /*for the copies. The order of this array will mimmick the order of the    */
  521.    /*original data file array.  Thus, handle_array_base[i]                    */
  522.    /*and handle_array_copies[i] will hold handles for the same server number. */
  523.    imm_p->handle_array_copies = malloc(imm_p->io_servers_required  
  524.                                        * imm_p->copies      
  525.                                        * sizeof(PVFS_handle));
  526.    if ( !imm_p->handle_array_copies )
  527.    {
  528.        js_p->error_code = -PVFS_ENOMEM;
  529.        goto error_exit;
  530.    }
  531.    memset(imm_p->handle_array_copies,0,imm_p->io_servers_required  
  532.                                        * imm_p->copies      
  533.                                        * sizeof(PVFS_handle));
  534.  
  535.    /*allocate space for the local_io_servers array.  this array contains the */
  536.    /*server names for local data handles.                                    */
  537.    imm_p->local_io_servers = malloc(  imm_p->io_servers_required
  538.                                     * sizeof(char *));
  539.    if ( !imm_p->local_io_servers )
  540.    {
  541.        js_p->error_code = -PVFS_ENOMEM;  
  542.        goto error_exit;
  543.  
  544.    }
  545.    memset(imm_p->local_io_servers
  546.          ,0
  547.          ,imm_p->io_servers_required * sizeof(char *));
  548.    imm_p->local_io_servers_count = 0;
  549.  
  550.  
  551.  
  552.    /*allocate space for the remote_io_servers array.  this array contains the */
  553.    /*server names for remote data handles.                                    */
  554.    imm_p->remote_io_servers = malloc(  imm_p->io_servers_required 
  555.                                      * sizeof(char*));
  556.    if ( !imm_p->remote_io_servers )
  557.    {
  558.        js_p->error_code = -PVFS_ENOMEM;
  559.        goto error_exit;
  560.    }
  561.    memset(imm_p->remote_io_servers
  562.          ,0
  563.          ,imm_p->io_servers_required * sizeof(char*));
  564.    imm_p->remote_io_servers_count = 0;
  565.  
  566.    /*populate the io_servers array with server_names:                         */
  567.    /*Step 1:  Always start by using the server names associated with the      */
  568.    /*         original datahandles, keeping the order in tact.                */
  569.    /*Step 2:  If additional servers are needed, then tap into the list of     */
  570.    /*         servers running in the file system and grab those that are not  */
  571.    /*         currently in the io_servers list.                               */
  572.  
  573.    /*Step 1*/   
  574.    for (i=0; i<imm_p->dfile_count; i++)
  575.    {
  576.       ret = PINT_cached_config_get_server_name( imm_p->io_servers[i], 
  577.                                                 sizeof(server_name)-1,
  578.                                                 imm_p->handle_array_base[i],
  579.                                                 imm_p->fs_id );
  580.       if (ret)
  581.       {
  582.          js_p->error_code = ret;
  583.          goto error_exit;
  584.       }
  585.       gossip_debug(GOSSIP_MIRROR_DEBUG,"\tValue of server_name is %s "
  586.                                        "for handle %llu\n"
  587.                                       ,imm_p->io_servers[i]
  588.                                       ,llu(imm_p->handle_array_base[i]));
  589.    }/*end for*/
  590.  
  591.    /*Step 2*/
  592.    if (imm_p->io_servers_required > imm_p->dfile_count)
  593.    {
  594.       ret = get_server_names(imm_p);
  595.       if (ret)
  596.       {
  597.           gossip_lerr("Unable to populate io_servers list.\n");
  598.           js_p->error_code = ret;
  599.           goto error_exit;
  600.       }
  601.    }
  602.  
  603.  
  604.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tconfig->host_id is %s.\n"
  605.                                    ,config->host_id);
  606.       
  607.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\timm_p->io_servers_required : %d\n"
  608.                                    ,imm_p->io_servers_required);
  609.    for (i=0,j=0; i<imm_p->io_servers_required; i++)
  610.    {
  611.       char *server_name = imm_p->io_servers[i];
  612.       if (strncmp(server_name,config->host_id,SERVER_NAME_MAX-1) == 0)
  613.       {/*local*/
  614.          gossip_debug(GOSSIP_MIRROR_DEBUG,"\tprocessing local....\n");
  615.          imm_p->local_io_servers[imm_p->handle_array_copies_local_count] =
  616.              malloc( SERVER_NAME_MAX );
  617.          if ( !imm_p->local_io_servers[imm_p->handle_array_copies_local_count] )
  618.          {
  619.               js_p->error_code = -PVFS_ENOMEM;
  620.               goto error_exit;
  621.          }
  622.          memset(imm_p->local_io_servers[imm_p->handle_array_copies_local_count]
  623.                ,0
  624.                ,SERVER_NAME_MAX);
  625.          memcpy(imm_p->local_io_servers[imm_p->handle_array_copies_local_count],
  626.                 server_name,SERVER_NAME_MAX-1);
  627.          if ( i < imm_p->dfile_count )
  628.          {
  629.             imm_p->handle_array_base_local[imm_p->handle_array_base_local_count] 
  630.             =
  631.             imm_p->handle_array_base[i];
  632.             imm_p->handle_array_base_local_count++;
  633.             gossip_debug(GOSSIP_MIRROR_DEBUG,"\tlocal source handle(%d):%llu\n"
  634.               ,imm_p->handle_array_base_local_count
  635.     ,llu(imm_p->handle_array_base_local[imm_p->handle_array_base_local_count]));
  636.          }
  637.  
  638.          imm_p->handle_array_copies_local_count++;
  639.       } 
  640.       else
  641.       {/*remote*/
  642.          imm_p->remote_io_servers[imm_p->handle_array_copies_remote_count] =
  643.              malloc( SERVER_NAME_MAX );
  644.          if (!imm_p->remote_io_servers[imm_p->handle_array_copies_remote_count])
  645.          {
  646.                js_p->error_code = -PVFS_ENOMEM;
  647.                goto error_exit;
  648.          }
  649.         memset(imm_p->remote_io_servers[imm_p->handle_array_copies_remote_count]
  650.               ,0
  651.               ,SERVER_NAME_MAX);
  652.         memcpy(imm_p->remote_io_servers[imm_p->handle_array_copies_remote_count]
  653.               ,server_name,SERVER_NAME_MAX-1);
  654.          imm_p->handle_array_copies_remote_count++;
  655.       }/*end if*/
  656.    }/*end for*/
  657.  
  658.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tLocal: %d\tRemote: %d\n"
  659.                                    ,imm_p->handle_array_copies_local_count
  660.                                    ,imm_p->handle_array_copies_remote_count);
  661.  
  662.    imm_p->local_io_servers_count = imm_p->handle_array_copies_local_count;
  663.    imm_p->remote_io_servers_count = imm_p->handle_array_copies_remote_count;
  664.  
  665.    for (i=0; i<imm_p->local_io_servers_count; i++)
  666.    {
  667.         gossip_debug(GOSSIP_MIRROR_DEBUG,"\tlocal_io_servers[%d]: %s\n",i,
  668.           imm_p->local_io_servers[i]);
  669.    }
  670.    for (i=0; i<imm_p->remote_io_servers_count; i++)
  671.    {
  672.         gossip_debug(GOSSIP_MIRROR_DEBUG,"\tremote_io_servers[%d]: %s\n",i,
  673.           imm_p->remote_io_servers[i]);
  674.    }
  675.  
  676.  
  677.    /*allocate and initialize space for local and remote handle arrays*/
  678.    if (imm_p->handle_array_copies_local_count)
  679.    {
  680.       imm_p->handle_array_copies_local = 
  681.         malloc( imm_p->handle_array_copies_local_count * 
  682.                 imm_p->copies * sizeof(PVFS_handle));
  683.      if ( !imm_p->handle_array_copies_local )
  684.       {
  685.          js_p->error_code = -PVFS_ENOMEM;
  686.          goto error_exit;
  687.       }
  688.    }
  689.    if (imm_p->handle_array_copies_remote_count)
  690.    {
  691.       imm_p->handle_array_copies_remote = 
  692.         malloc( imm_p->handle_array_copies_remote_count * 
  693.                 imm_p->copies * sizeof(PVFS_handle));
  694.       if ( !imm_p->handle_array_copies_remote )
  695.       {
  696.          js_p->error_code = -PVFS_ENOMEM;
  697.          goto error_exit;
  698.       }
  699.       
  700.    }/*end if*/
  701.  
  702.    memset(imm_p->handle_array_copies_local 
  703.          ,0
  704.          ,imm_p->handle_array_copies_local_count  * 
  705.           imm_p->copies * sizeof(PVFS_handle));
  706.    memset(imm_p->handle_array_copies_remote
  707.          ,0
  708.          ,imm_p->handle_array_copies_remote_count * 
  709.           imm_p->copies * sizeof(PVFS_handle));
  710.  
  711. error_exit:
  712.    /*all other memory will be freed in the "cleanup" action.*/
  713.    free(sm_p->key_a);
  714.    free(sm_p->val_a);
  715.    free(sm_p->error_a);
  716.    sm_p->key_a = sm_p->val_a = NULL;
  717.    sm_p->error_a = NULL;
  718.  
  719.    return SM_ACTION_COMPLETE;
  720. }/*end action inspect_source_info*/
  721.  
  722.  
  723.  
  724. /*We must get the bstream size for any datahandles that reside on this server.*/
  725. /*This scenario occurs when a metadata and i/o server are one of the same or  */
  726. /*a metadata server and i/o server are running on the same machine.  I think  */
  727. /*this is outdated now, but I check for it anyway.                            */
  728. static PINT_sm_action obtain_local_handle_sizes(struct PINT_smcb *smcb
  729.                                                ,job_status_s *js_p)
  730. {   
  731.    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing obtain_local_handle_sizes....\n");
  732.  
  733.    struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
  734.    PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
  735.    job_id_t job_id;
  736.    int ret = 0;
  737.  
  738.    js_p->error_code = 0;
  739.  
  740.  
  741.    /*Do we have any local handles?*/
  742.    if (imm_p->handle_array_copies_local_count == 0)
  743.        return SM_ACTION_COMPLETE;
  744.  
  745.    sm_p->error_a = malloc(imm_p->handle_array_base_local_count *
  746.                            sizeof(PVFS_error) );
  747.    if (!sm_p->error_a)
  748.    {
  749.       js_p->error_code = -PVFS_ENOMEM;
  750.       goto error_exit;
  751.    }
  752.  
  753.    imm_p->ds_attr_a = malloc(imm_p->handle_array_base_local_count *
  754.                              sizeof(PVFS_ds_attributes));
  755.    if (!imm_p->ds_attr_a)
  756.    {
  757.       js_p->error_code = -PVFS_ENOMEM;
  758.       goto error_exit;
  759.    }
  760.    
  761.    ret = job_trove_dspace_getattr_list(imm_p->fs_id
  762.                                       ,imm_p->handle_array_base_local_count
  763.                                       ,imm_p->handle_array_base_local
  764.                                       ,smcb
  765.                                       ,sm_p->error_a
  766.                                       ,imm_p->ds_attr_a
  767.                                       ,0
  768.                                       ,js_p
  769.                                       ,&job_id
  770.                                       ,server_job_context
  771.                                       ,NULL);
  772.    return ret;
  773.  
  774.  
  775. error_exit:
  776.    if (sm_p->error_a)
  777.       free(sm_p->error_a);
  778.    sm_p->error_a = NULL;
  779.  
  780.    return SM_ACTION_COMPLETE;
  781. }/*end action obtain_local_handle_sizes*/
  782.  
  783.  
  784.  
  785. static PINT_sm_action inspect_local_handle_sizes(struct PINT_smcb *smcb
  786.                                                 ,job_status_s *js_p)
  787. {   
  788.    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing inspect_local_handle_sizes..\n");
  789.  
  790.    struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
  791.    PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
  792.    int i,j;
  793.  
  794.  
  795.    /*Do we have any local handles?*/
  796.    if (imm_p->handle_array_copies_local_count == 0)
  797.        return SM_ACTION_COMPLETE;
  798.  
  799.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tchecking for errors....\n");
  800.    /*check for errors*/
  801.    for (i=0; i<imm_p->handle_array_base_local_count; i++)
  802.    {
  803.        if (sm_p->error_a[i])
  804.        {
  805.           js_p->error_code = sm_p->error_a[i];
  806.           free(sm_p->error_a);
  807.           sm_p->error_a = NULL;
  808.           return SM_ACTION_COMPLETE;
  809.        }
  810.    }/*end for*/
  811.    js_p->error_code = 0;
  812.  
  813.    imm_p->bstream_array_base_local = malloc(imm_p->dfile_count *
  814.                                             sizeof(PVFS_size));
  815.    if (!imm_p->bstream_array_base_local)
  816.    {
  817.        js_p->error_code = -PVFS_ENOMEM;
  818.        free(sm_p->error_a);
  819.        sm_p->error_a = NULL;
  820.        return SM_ACTION_COMPLETE;
  821.    }
  822.    memset(imm_p->bstream_array_base_local
  823.          ,0
  824.          ,imm_p->dfile_count * sizeof(PVFS_size));
  825.  
  826.    gossip_debug(GOSSIP_MIRROR_DEBUG
  827.                ,"\tpopulating bstream_array_base_local...\n");
  828.  
  829.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\thandle_array_base_local_count:%d\n"
  830.                                    ,imm_p->handle_array_base_local_count);
  831.    /*populate bstream_array_base_local*/
  832.    for (i=0; i<imm_p->handle_array_base_local_count; i++)
  833.    {
  834.        for (j=0; j<imm_p->dfile_count; j++)
  835.        {
  836.             gossip_debug(GOSSIP_MIRROR_DEBUG,"\tlocal handle(%d):%llu"
  837.                                              "\tbase handle(%d):%llu\n"
  838.                ,i,llu(imm_p->handle_array_base_local[i])
  839.                ,j,llu(imm_p->handle_array_base[j]) );
  840.             if (imm_p->handle_array_base_local[i] == imm_p->handle_array_base[j])
  841.             {
  842.               imm_p->bstream_array_base_local[j] = 
  843.               imm_p->ds_attr_a[i].u.datafile.b_size;
  844.               gossip_debug(GOSSIP_MIRROR_DEBUG,"\thandle:%llu\tsize:%d\n"
  845.                   ,llu(imm_p->handle_array_base_local[i])
  846.                   ,(int)imm_p->bstream_array_base_local[j] );
  847.             }
  848.        }/*end for*/
  849.    }/*end for*/
  850.  
  851.    free(sm_p->error_a);
  852.    sm_p->error_a = NULL;
  853.  
  854.    return SM_ACTION_COMPLETE;
  855. }/*end action inspect_local_handle_sizes*/
  856.  
  857.  
  858.  
  859. static PINT_sm_action create_local_datahandles (struct PINT_smcb *smcb
  860.                                                ,job_status_s *js_p)
  861. {   
  862.    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing create_local_datahandles....\n");
  863.  
  864.    struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
  865.    PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
  866.    job_id_t job_id;
  867.    int ret = 0;
  868.    int i;
  869.  
  870.    PVFS_handle_extent_array data_handle_ext_array;
  871.    server_configuration_s *config = get_server_config_struct();
  872.  
  873.  
  874.    js_p->error_code = 0;
  875.  
  876.    /*Do we have any local handles?*/
  877.    if (imm_p->handle_array_copies_local_count == 0)
  878.        return SM_ACTION_COMPLETE;
  879.  
  880.  
  881.    gossip_debug(GOSSIP_MIRROR_DEBUG,"Target handle: %llu\tTarget FS ID: %d\n"
  882.                                    ,llu(imm_p->metadata_handle),imm_p->fs_id);
  883.    gossip_debug(GOSSIP_MIRROR_DEBUG,"dfile count: %d\n",imm_p->dfile_count);
  884.    gossip_debug(GOSSIP_MIRROR_DEBUG,"stuffed size: %d\n"
  885.                                 ,sm_p->target_object_attr->u.meta.stuffed_size);
  886.    gossip_debug(GOSSIP_MIRROR_DEBUG,"hint.flags: %llu\n"
  887.                              ,llu(sm_p->target_object_attr->u.meta.hint.flags));
  888.    gossip_debug(GOSSIP_MIRROR_DEBUG,"dfile array P: %p\n"
  889.                                  ,sm_p->target_object_attr->u.meta.dfile_array);
  890.  
  891.    /*find local IO extent array for this file system for metadata host*/
  892.    ret = PINT_cached_config_get_server( imm_p->fs_id
  893.                                        ,config->host_id
  894.                                        ,PINT_SERVER_TYPE_IO
  895.                                        ,&data_handle_ext_array );
  896.    if (ret)
  897.    {
  898.       js_p->error_code = ret;
  899.       return SM_ACTION_COMPLETE;
  900.    } 
  901.    for (i=0;i<data_handle_ext_array.extent_count;i++)
  902.    {
  903.        gossip_debug(GOSSIP_MIRROR_DEBUG,"Extent Range %d:\tfirst  %llu"
  904.                                         "\tlast  %llu\n"
  905.                              ,i,llu(data_handle_ext_array.extent_array[i].first)
  906.                             ,llu(data_handle_ext_array.extent_array[i].last)  );
  907.    }
  908.  
  909.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tvalue of copies: %d \tlocation:%p\n"
  910.                                    ,imm_p->copies,&imm_p->copies);
  911.  
  912.  
  913.    /*create local datahandles - will be used as destination handles for copies*/
  914.    ret = job_trove_dspace_create_list( imm_p->fs_id
  915.                                       ,&data_handle_ext_array
  916.                                       ,imm_p->handle_array_copies_local
  917.                                       ,imm_p->handle_array_copies_local_count *
  918.                                        imm_p->copies
  919.                                       ,PVFS_TYPE_DATAFILE
  920.                                       ,NULL
  921.                                       ,TROVE_SYNC
  922.                                       ,smcb
  923.                                       ,0
  924.                                       ,js_p
  925.                                       ,&job_id
  926.                                       ,server_job_context
  927.                                       ,NULL );
  928.    return ret;
  929. } /*end action create_local_datahandles*/
  930.  
  931.  
  932.  
  933.  
  934. static PINT_sm_action create_remote_datahandles (struct PINT_smcb *smcb
  935.                                                 ,job_status_s *js_p)
  936. {   
  937.    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing create_remote_datahandles...\n");
  938.    struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
  939.    PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
  940.    int ret = 0,i;
  941.    job_id_t job_id;
  942.  
  943.    js_p->error_code = 0;
  944.  
  945.  
  946.    if (imm_p->handle_array_copies_remote_count == 0)
  947.        return SM_ACTION_COMPLETE;
  948.  
  949.    int rows = imm_p->copies;
  950.    int cols = imm_p->handle_array_copies_remote_count;
  951.  
  952.    imm_p->my_remote_servers = malloc(sizeof(char *) * rows * cols);
  953.    if (!imm_p->my_remote_servers)
  954.    {
  955.        gossip_lerr("Error allocating imm_p->my_remote_servers.\n");
  956.        js_p->error_code = -PVFS_ENOMEM;
  957.        return SM_ACTION_COMPLETE;
  958.    }
  959.    memset(imm_p->my_remote_servers,0,sizeof(char *) * rows * cols);
  960.  
  961.    /*setup my_remote_servers[copy,remote#] = remote server name.  This will */
  962.    /*allow job_precreate_pool to return handle_array_copies_remote where    */
  963.    /*[copy,remote#] = remote handle.  We end up with a list of remotes for  */
  964.    /*each copy in original distribution order.                              */
  965.    for (i=0; i<(rows*cols); i++)
  966.    {
  967.        imm_p->my_remote_servers[i] = imm_p->remote_io_servers[i%cols];
  968.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\tremote_io_servers[%d]:%s "
  969.                                         "my_remote_servers[%d]:%s\n"
  970.                                        ,(i%cols)
  971.                                        ,imm_p->remote_io_servers[i%cols]
  972.                                        ,i
  973.                                        ,imm_p->my_remote_servers[i]);
  974.    }
  975.  
  976.   ret = job_precreate_pool_get_handles(imm_p->fs_id
  977.                                        ,rows*cols,
  978.                                        PVFS_TYPE_DATAFILE,
  979.                                        (const char **)imm_p->my_remote_servers
  980.                                        ,imm_p->handle_array_copies_remote
  981.                                        ,0
  982.                                        ,smcb
  983.                                        ,0
  984.                                        ,js_p
  985.                                        ,&job_id
  986.                                        ,server_job_context
  987.                                        ,NULL);
  988.    return ret;
  989. }/*end action create_remote_datahandles*/
  990.  
  991.  
  992.  
  993.  
  994. static PINT_sm_action setup_datahandle_copies (struct PINT_smcb *smcb
  995.                                               ,job_status_s *js_p)
  996. {
  997.    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing setup_datahandle_copies...\n");
  998.    struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
  999.    PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
  1000.    server_configuration_s *config = get_server_config_struct();
  1001.    int i,j,k;
  1002.    
  1003.  
  1004.    js_p->error_code = 0;
  1005.  
  1006.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tRemote destination handles:\n");
  1007.    int rows = imm_p->copies;
  1008.    int cols = imm_p->handle_array_copies_remote_count;
  1009.    for (i=0; i<(rows*cols); i++)
  1010.    {
  1011.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\tRemote handle(%d):%llu\n"
  1012.                                     ,i
  1013.                                     ,llu(imm_p->handle_array_copies_remote[i]));
  1014.    }
  1015.  
  1016.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tLocal destination handles:\n");
  1017.    cols = imm_p->handle_array_copies_local_count;
  1018.    for (i=0; i<(rows*cols); i++)
  1019.    {
  1020.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\tLocal handle(%d):%llu\n"
  1021.                                     ,i
  1022.                                     ,llu(imm_p->handle_array_copies_local[i]));
  1023.    }
  1024.  
  1025.    for (i=0,j=0,k=0; i<(imm_p->io_servers_required * imm_p->copies); i++)
  1026.    {
  1027.       if ( strncmp(imm_p->io_servers[i%imm_p->io_servers_required]
  1028.                  ,config->host_id,SERVER_NAME_MAX-1) == 0 )
  1029.       {/*local*/
  1030.           memcpy(&imm_p->handle_array_copies[i]
  1031.                 ,&imm_p->handle_array_copies_local[j],sizeof(PVFS_handle));
  1032.           j++;
  1033.       }
  1034.       else
  1035.       {/*remote*/
  1036.           memcpy(&imm_p->handle_array_copies[i]
  1037.                 ,&imm_p->handle_array_copies_remote[k],sizeof(PVFS_handle));
  1038.           k++;
  1039.       }       
  1040.    }/*end for*/
  1041.  
  1042.    for (i=0; i<(imm_p->io_servers_required * imm_p->copies); i++)
  1043.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\thandle_array_copies[%d]:  %llu.\n"
  1044.                                        ,i
  1045.                                        ,llu(imm_p->handle_array_copies[i]));
  1046.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tnumber of io servers required: %d\n"
  1047.                                    ,imm_p->io_servers_required);
  1048.  
  1049.    /*create and initialize the writes_completed array*/
  1050.    imm_p->writes_completed = malloc(sizeof(PVFS_handle) * imm_p->dfile_count
  1051.                                                         * imm_p->copies);
  1052.    if (!imm_p->writes_completed)
  1053.    {
  1054.       gossip_lerr("Unable to allocate imm_p->writes_completed.\n");
  1055.       js_p->error_code = -PVFS_ENOMEM;
  1056.       return SM_ACTION_COMPLETE;
  1057.    }
  1058.    memset(imm_p->writes_completed,UINT64_HIGH,sizeof(PVFS_handle) 
  1059.                                             * imm_p->dfile_count
  1060.                                             * imm_p->copies);
  1061.  
  1062.    /*the retry count is used to monitor how many times we retry a write. */
  1063.    /*this value is incremented in the check_for_retries state.           */
  1064.    imm_p->retry_count = 0;
  1065.  
  1066.    return SM_ACTION_COMPLETE;
  1067. }/*end action setup_datahandle_copies*/
  1068.  
  1069.  
  1070. static PINT_sm_action copy_data (struct PINT_smcb *smcb, job_status_s *js_p)
  1071. {   
  1072.    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing copy_data....\n");
  1073.    struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
  1074.    PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
  1075.    server_configuration_s *config = get_server_config_struct();
  1076.    filesystem_configuration_s *fs = 
  1077.                                     PINT_config_find_fs_id(config,imm_p->fs_id);
  1078.    int ret = 0;
  1079.    int src,row,cols,i,index,wc;
  1080.  
  1081.    /*this variable helps to understand the logic better.  it is a redeclara- */
  1082.    /*tion of the one dimensional imm_p->handle_array_copies and can be ac-   */
  1083.    /*cessed as handle_array_copies[copies,dfile_count].                      */
  1084.    PVFS_handle *handle_array_copies[imm_p->copies];
  1085.    memset(handle_array_copies,0,sizeof(PVFS_handle) * imm_p->copies);
  1086.    ONE_DIM_TO_TWO_DIMS(imm_p->handle_array_copies
  1087.                       ,handle_array_copies
  1088.                       ,imm_p->copies,imm_p->io_servers_required
  1089.                       ,PVFS_handle);
  1090.  
  1091.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tone dim to two dims:\n");
  1092.    for (row=0; row<imm_p->copies; row++)
  1093.    {
  1094.        for (cols=0; cols<imm_p->io_servers_required; cols++)
  1095.         gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\thandle_array_copies[%d][%d] : "
  1096.                                          "%llu\n"
  1097.                                         ,row,cols
  1098.                                         ,llu(handle_array_copies[row][cols]));
  1099.    }
  1100.  
  1101.    js_p->error_code = 0;
  1102.  
  1103.    /*for each source handle[src], create a MIRROR request containing a set of */ 
  1104.    /*destination handles.                                                     */
  1105.    for (src=0; src<imm_p->dfile_count; src++)
  1106.    {
  1107.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\tWorking on src #%d\n",src);
  1108.  
  1109.        /* writes_completed indicates the status of each copy for each source: */
  1110.        /* 0 ==> completed, + ==> incomplete, UINT64_HIGH ==> initial state.   */
  1111.        /* If incomplete, the value stored in the array is the destination     */
  1112.        /* handle.                                                             */
  1113.        for (row=src,cols=0; cols<imm_p->copies; cols++)
  1114.        {
  1115.            index = (imm_p->copies * row) + cols;
  1116.            /*this will capture UINT64_HIGH or handle*/
  1117.            if (   imm_p->writes_completed[index] > 0
  1118.                || imm_p->writes_completed[index] == UINT64_HIGH)
  1119.               break;
  1120.        }
  1121.  
  1122.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\tValue of cols is %d\n",cols);
  1123.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\tValue of imm_p->copies is %d.\n"
  1124.                                        ,imm_p->copies);
  1125.  
  1126.        /*if all copies for this source are zero ==> process next source.*/ 
  1127.        if (cols==imm_p->copies)
  1128.        {
  1129.           gossip_debug(GOSSIP_MIRROR_DEBUG,"\tThis source[%d] has all "
  1130.                                            "complete writes.\n"
  1131.                                           ,src);
  1132.           continue;
  1133.        }
  1134.  
  1135.        struct PVFS_server_req *req = malloc(sizeof(struct PVFS_server_req));
  1136.        if (!req)
  1137.        {
  1138.            gossip_lerr("Unable to allocate PVFS_server_req.\n");
  1139.            js_p->error_code = -PVFS_ENOMEM;
  1140.            return SM_ACTION_COMPLETE;
  1141.        }
  1142.        memset(req,0,sizeof(struct PVFS_server_req));
  1143.  
  1144.        req->u.mirror.dst_handle = malloc(sizeof(PVFS_handle) * imm_p->copies);
  1145.        if ( !req->u.mirror.dst_handle)
  1146.        {
  1147.             gossip_lerr("Unable to allocate mirror.dst_handle.\n");
  1148.             js_p->error_code = -PVFS_ENOMEM;
  1149.             return SM_ACTION_COMPLETE;
  1150.        }
  1151.        memset(req->u.mirror.dst_handle,0,sizeof(PVFS_handle) * imm_p->copies);
  1152.  
  1153.        /*index into the writes_completed array for each destination handle*/
  1154.        req->u.mirror.wcIndex = malloc(sizeof(uint32_t) * imm_p->copies);
  1155.        if ( !req->u.mirror.wcIndex )
  1156.        {
  1157.             gossip_lerr("Unable to allocate mirror.wcIndex.\n");
  1158.             js_p->error_code = -PVFS_ENOMEM;
  1159.             return SM_ACTION_COMPLETE;
  1160.        }
  1161.        memset(req->u.mirror.wcIndex,0,sizeof(uint32_t) * imm_p->copies);
  1162.  
  1163.        req->op = PVFS_SERV_MIRROR;
  1164.        req->credentials = sm_p->req->credentials;
  1165.  
  1166.        req->u.mirror.src_handle    = imm_p->handle_array_base[src];
  1167.  
  1168.        
  1169.        /* In the initial state or when all writes have failed, get destination*/
  1170.        /* handles from handle_array_copies array.  Otherwise, use the handles */
  1171.        /* stored in the writes_completed array.                               */
  1172.        index = imm_p->copies*src; /*first copy for this source*/
  1173.        if (imm_p->writes_completed[index] == UINT64_HIGH)
  1174.        {
  1175.           /*handle_array_copies[copy,server#] is accessed as a two-dimensional*/
  1176.           /*array where a row represents a copy and columns represent the     */
  1177.           /*destination handles,in order of the original file distribution. We*/
  1178.           /*map the source handle[i], which is also in distribution order,    */
  1179.           /*to handle_arrray_copies[0,i+1],[1,i+2],..,[n-1,(i+y)-1], where n  */
  1180.           /*is the number of copies and y is the number of handles in one copy*/
  1181.           for (wc=0,row=0,cols=(src+1)%imm_p->io_servers_required;
  1182.                row < imm_p->copies;
  1183.                wc++,row++,cols=(cols+1)%imm_p->io_servers_required)
  1184.           {
  1185.             req->u.mirror.dst_handle[row] = handle_array_copies[row][cols];
  1186.             req->u.mirror.wcIndex[row]     = index + wc;
  1187.             req->u.mirror.dst_count++;
  1188.           }
  1189.        } else {
  1190.           for (row=src,cols=0,i=0; cols<imm_p->copies; cols++,i++)
  1191.           {
  1192.               index = (imm_p->copies*row) + cols;
  1193.               if (imm_p->writes_completed[index] > 0)
  1194.               {
  1195.                  req->u.mirror.dst_handle[i] = imm_p->writes_completed[index];
  1196.                  req->u.mirror.wcIndex[i] = index;
  1197.                  req->u.mirror.dst_count++;
  1198.               }
  1199.           }
  1200.        }
  1201.        req->u.mirror.fs_id         = imm_p->fs_id;
  1202.        req->u.mirror.dist          = imm_p->dist;
  1203.        req->u.mirror.src_server_nr = src;
  1204.        req->u.mirror.flow_type     = fs->flowproto;
  1205.        req->u.mirror.encoding      = fs->encoding;
  1206.  
  1207.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\treq->: src:%llu\tfs_id:%d"
  1208.                                         "\tdist name:%s\tsrc server_nr:%d\n"
  1209.                                        ,llu(req->u.mirror.src_handle)
  1210.                                        ,req->u.mirror.fs_id
  1211.                                        ,req->u.mirror.dist->dist_name
  1212.                                        ,req->u.mirror.src_server_nr );
  1213.        for (i=0; i<req->u.mirror.dst_count; i++)
  1214.            gossip_debug(GOSSIP_MIRROR_DEBUG,"\treq->dst_handle[%d] : %llu\n"
  1215.                                            ,i
  1216.                                            ,llu(req->u.mirror.dst_handle[i]));
  1217.  
  1218.        struct PINT_server_op *mirror_op = 
  1219.                               malloc(sizeof(struct PINT_server_op));
  1220.        if (!mirror_op)
  1221.        {
  1222.            gossip_lerr("Error allocating mirror_op");
  1223.            js_p->error_code = -PVFS_ENOMEM;
  1224.            return SM_ACTION_COMPLETE;
  1225.        }
  1226.        memset(mirror_op,0,sizeof(struct PINT_server_op));
  1227.  
  1228.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\tabout to allocate mirror_op...\n");
  1229.  
  1230.        if (imm_p->bstream_array_base_local)
  1231.           req->u.mirror.bsize = imm_p->bstream_array_base_local[src];
  1232.        mirror_op->req = req;
  1233.        mirror_op->op  = req->op;
  1234.        mirror_op->addr = sm_p->addr;/*get addr for this server*/
  1235.  
  1236.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\tmirror_op->req(%p)\n"
  1237.                                        ,mirror_op->req);
  1238.  
  1239.        if ( strncmp(imm_p->io_servers[src]
  1240.                    ,config->host_id
  1241.                    ,SERVER_NAME_MAX-1) == 0 )
  1242.        {
  1243.  
  1244.            gossip_debug(GOSSIP_MIRROR_DEBUG,"Above SRC is local.\n");
  1245.            PINT_sm_push_frame(smcb, LOCAL_SRC, mirror_op);
  1246.        }
  1247.        else
  1248.        {
  1249.           /*setup msgpairarray call.  This msgpair represents a connection */
  1250.           /*between the meta server and a remote IO server.  The request   */
  1251.           /*for the remote IO server is PVFS_SERV_MIRROR, which will read  */
  1252.           /*data residing on that server and write it to a destination     */
  1253.           /*handle specified in the request.  The response returned from   */
  1254.           /*this msgpair will indicate if the copy was successful.         */
  1255.           gossip_debug(GOSSIP_MIRROR_DEBUG,"Above SRC is remote.\n");
  1256.  
  1257.           PINT_sm_msgarray_op *msgarray_op = &(mirror_op->msgarray_op);
  1258.  
  1259.           memset(msgarray_op,0,sizeof(PINT_sm_msgarray_op));
  1260.           msgarray_op->msgarray = &msgarray_op->msgpair;
  1261.           msgarray_op->count = 1;
  1262.           PINT_sm_msgpair_state *msg_p = &msgarray_op->msgpair;
  1263.  
  1264.           msg_p->req        = *req;
  1265.           msg_p->fs_id      = req->u.mirror.fs_id;
  1266.           msg_p->handle     = req->u.mirror.src_handle;
  1267.           msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
  1268.           msg_p->comp_fn    = mirror_comp_fn;
  1269.  
  1270.           /*setup msgarray parameters*/
  1271.           PINT_serv_init_msgarray_params(mirror_op,req->u.mirror.fs_id);
  1272.  
  1273.           /*determine the BMI svr address for the source handle*/
  1274.           ret = PINT_cached_config_map_to_server(&msg_p->svr_addr
  1275.                                                 ,msg_p->handle
  1276.                                                 ,msg_p->fs_id );
  1277.           if (ret)
  1278.           {
  1279.               gossip_err("Failed to map address\n");
  1280.               js_p->error_code = -1;
  1281.               return SM_ACTION_COMPLETE;
  1282.           }
  1283.  
  1284.           gossip_debug(GOSSIP_MIRROR_DEBUG,"\tmsg_p->req.op:%d"
  1285.                                            "\tmsg_p->fs_id:%d"
  1286.                                            "\tmsg_p->handle:%llu\n"
  1287.                                            ,msg_p->req.op
  1288.                                            ,msg_p->fs_id
  1289.                                            ,llu(msg_p->handle) );
  1290.  
  1291.           PINT_sm_push_frame(smcb, REMOTE_SRC, mirror_op);
  1292.        }
  1293.    }/*end for (src)*/
  1294.  
  1295.  
  1296.    return SM_ACTION_COMPLETE;
  1297. }/*end action copy_data*/
  1298.  
  1299.  
  1300.  
  1301.  
  1302. static PINT_sm_action check_copy_results (struct PINT_smcb *smcb
  1303.                                          ,job_status_s *js_p)
  1304. {   
  1305.    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing check_copy_results....\n");
  1306.    struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
  1307.    PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
  1308.    int task_id, error_code, remaining, i, j, index;
  1309.    struct PINT_server_op       *mirror_op = NULL;
  1310.    struct PVFS_servresp_mirror *respmir   = NULL;
  1311.    struct PVFS_servreq_mirror  *reqmir    = NULL;   
  1312.  
  1313.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tjs_p->error_code:%d\n"
  1314.                                    ,js_p->error_code);
  1315.    js_p->error_code = 0;
  1316.  
  1317.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tsm_p->op:%d\n",sm_p->op);
  1318.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tsmcb->base_frame:%d"
  1319.                                     "\tsmcb->frame_count:%d\n"
  1320.                                    ,smcb->base_frame,smcb->frame_count);
  1321.  
  1322.    /*the pjmp should have pushed at least one frame*/
  1323.    assert(smcb->frame_count > (smcb->base_frame+1));
  1324.  
  1325.   do {
  1326.        mirror_op=PINT_sm_pop_frame(smcb, &task_id, &error_code, &remaining);
  1327.        respmir = &(mirror_op->resp.u.mirror);
  1328.        reqmir  = &(mirror_op->req->u.mirror);
  1329.  
  1330.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\tmirror_op->op:%d"
  1331.                                         "\ttask_id:%d"
  1332.                                         "\terror_code:%d(%0x)"
  1333.                                         "\tremaining:%d\n"
  1334.                                        ,mirror_op->op
  1335.                                        ,task_id
  1336.                                        ,error_code,error_code
  1337.                                        ,remaining);
  1338.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\tresp.src_handle:%llu "
  1339.                                         "\tresp.src_server_nr:%d\n"
  1340.                                    ,llu(respmir->src_handle)
  1341.                                    ,respmir->src_server_nr );
  1342.        for (i=0; i<respmir->dst_count; i++)
  1343.        {
  1344.            gossip_debug(GOSSIP_MIRROR_DEBUG
  1345.                                 ,"\t\tbytes_written[%d]:%d\n"
  1346.                                  "\t\twrite_status_codde[%d]:%d\n"
  1347.                                 ,i
  1348.                                 ,respmir->bytes_written[i]
  1349.                                 ,i
  1350.                                 ,respmir->write_status_code[i]);
  1351.        }
  1352.  
  1353.        /*if error_code != 0, then NONE of the writes requested in this msgpair*/
  1354.        /*array executed, so we do not need to check the individual status     */
  1355.        /*codes.                                                               */
  1356.        if (error_code)
  1357.        {
  1358.            js_p->error_code = error_code; /*respmir has no valid data in it.*/
  1359.        } else {
  1360.            /*check the write status for each destination associated with this */
  1361.            /*particular source handle.                                        */
  1362.            for (i=0; i<respmir->dst_count; i++)
  1363.            {
  1364.               if (js_p->error_code)
  1365.               {
  1366.                   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tStatus already "
  1367.                                                    "established:%d(0x%0x)\n"
  1368.                                                   ,js_p->error_code
  1369.                                                   ,js_p->error_code);
  1370.               }
  1371.               else if (respmir->write_status_code[i])
  1372.               {
  1373.                   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tStatus came from error_"
  1374.                                                    "code %d(0x%0x)\n"
  1375.                                                   ,error_code
  1376.                                                   ,error_code);
  1377.                   js_p->error_code = respmir->write_status_code[i];
  1378.               }
  1379.               else
  1380.               {
  1381.                   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tStatus is still zero(%d)"
  1382.                                                    "\n"
  1383.                                                   ,js_p->error_code);
  1384.               }
  1385.            }/*end for*/
  1386.        }/*end if*/
  1387.  
  1388.  
  1389.        for (i=0; i<reqmir->dst_count; i++)
  1390.        {
  1391.            index = reqmir->wcIndex[i];
  1392.            if (   error_code == 0  /*this will short circuit if false*/
  1393.                && respmir->write_status_code[i] == 0)
  1394.            {
  1395.                imm_p->writes_completed[index] = 0;
  1396.            } else if (error_code == 0)
  1397.            {
  1398.                imm_p->writes_completed[index] = reqmir->dst_handle[i];
  1399.            } else
  1400.            {
  1401.                imm_p->writes_completed[index] = UINT64_HIGH;
  1402.            } 
  1403.        }
  1404.  
  1405.        switch(task_id)
  1406.        {
  1407.           case LOCAL_SRC:
  1408.           {
  1409.              gossip_debug(GOSSIP_MIRROR_DEBUG,
  1410.                                             "\tReturning from LOCAL call...\n");
  1411.              break;
  1412.           }
  1413.           case REMOTE_SRC:
  1414.           {
  1415.              /*the destory can be moved into cleanup_msgpairarray, if none of*/
  1416.              /*its values are needed in this function.                       */
  1417.              PINT_msgpairarray_destroy(&(mirror_op->msgarray_op));
  1418.              gossip_debug(GOSSIP_MIRROR_DEBUG,
  1419.                                             "\tReturning from REMOTE call .\n");
  1420.              break;
  1421.           }
  1422.        }/*end switch*/
  1423.  
  1424.        /*cleanup request/response allocations for mirror request*/
  1425.        if (reqmir->dst_handle)
  1426.            free(reqmir->dst_handle);
  1427.        if (reqmir->wcIndex)
  1428.            free(reqmir->wcIndex);
  1429.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\tmirror_op->req(%p)\n"
  1430.                                        ,mirror_op->req);
  1431.        free(mirror_op->req);
  1432.  
  1433.        if (respmir->bytes_written)
  1434.           free(respmir->bytes_written);
  1435.        if (respmir->write_status_code)
  1436.           free(respmir->write_status_code);
  1437.  
  1438.        free(mirror_op);
  1439.   } while (remaining > (smcb->base_frame+1));
  1440.  
  1441.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tfinal value of js_p->error_code:%d(%0x)\n"
  1442.                                    ,js_p->error_code, js_p->error_code);
  1443.  
  1444.    /*if one of the writes failed, js_p->error_code will contain an error.*/
  1445.  
  1446.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\twrites_completed array[src,server#]:\n");
  1447.    for (i=0; i<imm_p->dfile_count; i++)
  1448.    {
  1449.      for (j=0; j<imm_p->copies; j++)
  1450.      {
  1451.        index = (imm_p->dfile_count * i) + j;
  1452.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\t[%d][%d]:%llu\n"
  1453.                                        ,i,j
  1454.                                        ,llu(imm_p->writes_completed[index]));
  1455.      }
  1456.    }
  1457.  
  1458.    return SM_ACTION_COMPLETE;
  1459. }/*end action check_copy_results*/
  1460.  
  1461.  
  1462. static PINT_sm_action check_for_retries (struct PINT_smcb *smcb
  1463.                                         ,job_status_s *js_p)
  1464. {   
  1465.    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing check_for_retries....\n");
  1466.    struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
  1467.    PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
  1468.    int i;
  1469.  
  1470.    imm_p->retry_count++;
  1471.  
  1472.    /*Have we hit the retry limit?*/
  1473.    if (imm_p->retry_count >= WRITE_RETRY_LIMIT)
  1474.    {
  1475.        js_p->error_code = 0;
  1476.        return SM_ACTION_COMPLETE;
  1477.    }
  1478.  
  1479.    /*Are there any writes to retry?*/
  1480.    for (i=0; i<(imm_p->dfile_count * imm_p->copies); i++)
  1481.    {
  1482.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\twrites_complete[%d]:%llu\n"
  1483.                                        ,i,llu(imm_p->writes_completed[i]));
  1484.        if (imm_p->writes_completed[i] != 0)
  1485.        {
  1486.           js_p->error_code = RETRY;
  1487.           return SM_ACTION_COMPLETE;
  1488.        }
  1489.    }
  1490.    
  1491.    js_p->error_code = 0;
  1492.    return SM_ACTION_COMPLETE;
  1493. }/*end state check_for_retries*/
  1494.  
  1495.  
  1496.  
  1497. static PINT_sm_action store_mirror_info (struct PINT_smcb *smcb
  1498.                                         ,job_status_s *js_p)
  1499. {   
  1500.    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing store_mirror_info....\n");
  1501.    struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
  1502.    PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
  1503.    PVFS_handle *reorg_handles = NULL;
  1504.    int  key_count = 3;
  1505.    int ret = 0,i,j;
  1506.    job_id_t job_id;
  1507.  
  1508.    js_p->error_code = 0;
  1509.  
  1510.    /*put copy handles in proper distribution order*/
  1511.    reorg_handles = reorganize_copy_handles(imm_p);
  1512.    if (!reorg_handles)
  1513.    {
  1514.       gossip_lerr("Unable to create reorg_handles array.\n");
  1515.       js_p->error_code = -PVFS_ENOMEM;
  1516.       return SM_ACTION_COMPLETE;
  1517.    }
  1518.  
  1519.    /*setup key/val pairs*/
  1520.    sm_p->keyval_count = key_count;
  1521.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tvalue of copies:%d \tlocation:%p\n"
  1522.                                    ,imm_p->copies,&imm_p->copies);
  1523.  
  1524.    sm_p->key_a = malloc(sizeof(PVFS_ds_keyval) * sm_p->keyval_count);
  1525.    sm_p->val_a = malloc(sizeof(PVFS_ds_keyval) * sm_p->keyval_count);
  1526.  
  1527.    if (!sm_p->key_a || !sm_p->val_a)
  1528.       goto error_exit;
  1529.  
  1530.    memset(sm_p->key_a,0,sizeof(PVFS_ds_keyval) * sm_p->keyval_count);
  1531.    memset(sm_p->val_a,0,sizeof(PVFS_ds_keyval) * sm_p->keyval_count);
  1532.  
  1533.    /*setup user.pvfs2.mirror.handles*/
  1534.    i=0; assert(i<key_count);
  1535.    sm_p->key_a[i].buffer = malloc(sizeof(handle_key));
  1536.    if (!sm_p->key_a[i].buffer)
  1537.       goto error_exit;
  1538.    strcpy(sm_p->key_a[i].buffer,handle_key);
  1539.    sm_p->key_a[i].buffer_sz = sizeof(handle_key);
  1540.  
  1541.    sm_p->val_a[i].buffer = reorg_handles;
  1542.    sm_p->val_a[i].buffer_sz = sizeof(PVFS_handle) * imm_p->dfile_count
  1543.                                                   * imm_p->copies; 
  1544.  
  1545.    /*setup user.pvfs2.mirror.copies*/
  1546.    i++; assert(i<key_count);
  1547.    sm_p->key_a[i].buffer = malloc(sizeof(copy_count_key));
  1548.    if (!sm_p->key_a[i].buffer)
  1549.       goto error_exit;
  1550.    strcpy(sm_p->key_a[i].buffer,copy_count_key);
  1551.    sm_p->key_a[i].buffer_sz = sizeof(copy_count_key);
  1552.  
  1553.    sm_p->val_a[i].buffer = malloc(sizeof(imm_p->copies));
  1554.    if (!sm_p->val_a[i].buffer)
  1555.       goto error_exit;
  1556.    sm_p->val_a[i].buffer_sz = sizeof(imm_p->copies);
  1557.    memcpy(sm_p->val_a[i].buffer,&(imm_p->copies),sm_p->val_a[i].buffer_sz);
  1558.  
  1559.    /*setup user.pvfs2.mirror.status*/
  1560.    i++; assert(i<key_count);
  1561.    sm_p->key_a[i].buffer = malloc(sizeof(status_key));
  1562.    if (!sm_p->key_a[i].buffer)
  1563.       goto error_exit;
  1564.    strcpy(sm_p->key_a[i].buffer,status_key);
  1565.    sm_p->key_a[i].buffer_sz = sizeof(status_key);
  1566.  
  1567.    sm_p->val_a[i].buffer = malloc(sizeof(PVFS_handle) * imm_p->dfile_count
  1568.                                                       * imm_p->copies);
  1569.    if (!sm_p->val_a[i].buffer)
  1570.       goto error_exit;
  1571.    sm_p->val_a[i].buffer_sz = sizeof(PVFS_handle) * imm_p->dfile_count 
  1572.                                                   * imm_p->copies;
  1573.    memcpy(sm_p->val_a[i].buffer,imm_p->writes_completed
  1574.                                ,sm_p->val_a[i].buffer_sz);
  1575.    
  1576.    /*verify inputs*/
  1577.    i=0; assert(i<key_count);
  1578.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tVerify inputs BEFORE call to trove...\n");
  1579.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\t%s\n",(char *)sm_p->key_a[i].buffer);
  1580.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tsize of buffer : %d\n"
  1581.                                    ,sm_p->key_a[i].buffer_sz);
  1582.    PVFS_handle *myHandle = (PVFS_handle *)sm_p->val_a[i].buffer;
  1583.    for (j=0; j<(imm_p->dfile_count*imm_p->copies); j++)
  1584.    {
  1585.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\thandle(%d):%llu\n"
  1586.        ,j
  1587.        ,llu(myHandle[j]));
  1588.    }
  1589.  
  1590.    i++; assert(i<key_count);
  1591.    int *myCount = (int *)sm_p->val_a[i].buffer;
  1592.    sm_p->val_a[i].buffer_sz = sizeof(int);
  1593.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\t%s:%d \tpointer:%p \tbuffer size:%d\n"
  1594.                                    ,(char *)sm_p->key_a[i].buffer
  1595.                                    ,*myCount,myCount
  1596.                                    ,sm_p->val_a[i].buffer_sz);
  1597.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tkey count:%d\n"
  1598.                                    ,sm_p->keyval_count);
  1599.  
  1600.    i++; assert(i<key_count);
  1601.    PVFS_handle *myStatus = (PVFS_handle *)sm_p->val_a[i].buffer;
  1602.    for (j=0; j<(imm_p->dfile_count * imm_p->copies); j++)
  1603.    {
  1604.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\thandle(%d):status(%llu)\n"
  1605.                                        ,j
  1606.                                        ,llu(myStatus[j]));
  1607.    }
  1608.  
  1609.    /*store keys*/
  1610.    ret = job_trove_keyval_write_list( imm_p->fs_id
  1611.                                      ,imm_p->metadata_handle
  1612.                                      ,sm_p->key_a
  1613.                                      ,sm_p->val_a
  1614.                                      ,sm_p->keyval_count
  1615.                                  ,TROVE_SYNC /*trove flags*/
  1616.                                      ,NULL
  1617.                                      ,smcb
  1618.                                      ,0
  1619.                                      ,js_p
  1620.                                      ,&job_id
  1621.                                      ,server_job_context
  1622.                                      ,NULL);
  1623.  
  1624.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tvalue of ret from call to trove : %d\n"
  1625.                                    ,ret);
  1626.  
  1627.    i=0; assert(i<key_count);
  1628.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tVerify inputs AFTER call to trove...\n");
  1629.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\t%s\n",(char *)sm_p->key_a[i].buffer);
  1630.    for (j=0; j<(imm_p->dfile_count * imm_p->copies); j++)
  1631.    {
  1632.        PVFS_handle *myHandle = (PVFS_handle *)sm_p->val_a[i].buffer;
  1633.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\thandle(%d):%llu\n"
  1634.        ,j
  1635.        ,llu(myHandle[j]));
  1636.    }
  1637.  
  1638.    i++; assert(i<key_count);
  1639.    myCount = (int *)sm_p->val_a[i].buffer;
  1640.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\t%s:%d \tpointer:%p\n"
  1641.                                    ,(char *)sm_p->key_a[i].buffer
  1642.                                    ,*myCount
  1643.                                    ,sm_p->val_a[i].buffer);
  1644.  
  1645.  
  1646.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tvalue of ret from trove call : %d\n"
  1647.                                    ,ret);
  1648.    return (ret);
  1649.  
  1650. error_exit:
  1651.    for (i=0; i<sm_p->keyval_count; i++)
  1652.    {
  1653.        if (sm_p->key_a && sm_p->key_a[i].buffer)
  1654.           free(sm_p->key_a[i].buffer);
  1655.        if (sm_p->val_a && sm_p->val_a[i].buffer)
  1656.           free(sm_p->val_a[i].buffer);
  1657.    }
  1658.  
  1659.    if (sm_p->key_a)
  1660.       free(sm_p->key_a);
  1661.    if (sm_p->val_a)
  1662.       free(sm_p->val_a);
  1663.  
  1664.    js_p->error_code = -PVFS_ENOMEM;
  1665.    return SM_ACTION_COMPLETE;
  1666. }/*end action store_mirror_info*/
  1667.  
  1668.  
  1669.  
  1670. static PINT_sm_action check_store_job (struct PINT_smcb *smcb
  1671.                                       ,job_status_s *js_p)
  1672. {   
  1673.    gossip_debug(GOSSIP_MIRROR_DEBUG,
  1674.                       "Executing check_store_job....\n");
  1675.    struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
  1676.    PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
  1677.    int i;
  1678.   
  1679.  
  1680.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tjs_p->error_code:%d\n"
  1681.                                    ,js_p->error_code);
  1682.  
  1683.    if (js_p->error_code)
  1684.    {
  1685.       gossip_err("Unable to store datahandles and number of copies for this "
  1686.                  "mirror operation.\n");
  1687.       gossip_err("\tMeta data handle is %llu\n",llu(imm_p->metadata_handle));
  1688.       return SM_ACTION_COMPLETE;
  1689.    }
  1690.  
  1691.    /*release memory used in previous job call*/
  1692.    for (i=0; i<sm_p->keyval_count; i++)
  1693.    {
  1694.           free(sm_p->key_a[i].buffer);
  1695.           free(sm_p->val_a[i].buffer);
  1696.    }
  1697.    free(sm_p->key_a);
  1698.    free(sm_p->val_a);
  1699.    sm_p->key_a = sm_p->val_a = NULL;
  1700.    sm_p->keyval_count = 0;
  1701.  
  1702.    js_p->error_code = 0;
  1703.  
  1704.    return SM_ACTION_COMPLETE;
  1705. }/*end state check_store_job*/
  1706.  
  1707.  
  1708.  
  1709.  
  1710. static PINT_sm_action replace_remote_datahandle_objects(struct PINT_smcb *smcb
  1711.                                                        ,job_status_s *js_p)
  1712. {   
  1713.    gossip_debug(GOSSIP_MIRROR_DEBUG,
  1714.                       "Executing replace_remote_datahandle_objects....\n");
  1715.  
  1716.    struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
  1717.    PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
  1718.    job_id_t job_id;
  1719.    int ret;  
  1720.    int tmpindex;
  1721.    PVFS_handle pool_handle;
  1722.  
  1723.    js_p->error_code = 0;
  1724.  
  1725.    if (!imm_p->handle_array_copies_remote)
  1726.    {
  1727.        gossip_debug(GOSSIP_MIRROR_DEBUG,"handle_array_copies_remote is "
  1728.                                         "null: %p\n"
  1729.                                         ,imm_p->handle_array_copies_remote);
  1730.        js_p->error_code = REPLACE_DONE;
  1731.        return SM_ACTION_COMPLETE;
  1732.    }
  1733.  
  1734.    imm_p->handle_array_copies_remote_count--;
  1735.    if (imm_p->handle_array_copies_remote_count < 0)
  1736.    {
  1737.        js_p->error_code = REPLACE_DONE;
  1738.        return SM_ACTION_COMPLETE;
  1739.    }
  1740.  
  1741.    tmpindex = imm_p->handle_array_copies_remote_count;
  1742.  
  1743.    /* find the pool that this handle belongs to */
  1744.    ret = job_precreate_pool_lookup_server( imm_p->remote_io_servers[tmpindex],
  1745.                                            PVFS_TYPE_DATAFILE, 
  1746.                                           imm_p->fs_id
  1747.                                           ,&pool_handle );
  1748.    if (ret < 0)
  1749.    {
  1750.       imm_p->handle_array_copies_remote_count++;
  1751.       js_p->error_code = ret;
  1752.       return SM_ACTION_COMPLETE;
  1753.    }
  1754.  
  1755.    ret = job_precreate_pool_fill( pool_handle
  1756.                                  ,imm_p->fs_id
  1757.                                  ,&(imm_p->handle_array_copies_remote[tmpindex])
  1758.                                  ,1
  1759.                                  ,smcb
  1760.                                  ,0
  1761.                                  ,js_p
  1762.                                  ,&job_id
  1763.                                  ,server_job_context
  1764.                                  ,NULL );
  1765.  
  1766.    return ret;
  1767. }/*end action replace_remote_datahandle_objects*/
  1768.  
  1769.  
  1770.  
  1771. static PINT_sm_action remove_local_datahandle_objects(struct PINT_smcb *smcb
  1772.                                                      ,job_status_s *js_p)
  1773. {   
  1774.    gossip_debug(GOSSIP_MIRROR_DEBUG,
  1775.                              "Executing remove_local_datahandle_objects....\n");
  1776.   
  1777.    struct PINT_server_op *sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
  1778.    PINT_server_create_copies_op *imm_p = &(sm_p->u.create_copies);
  1779.    job_id_t job_id;
  1780.    int ret;
  1781.  
  1782.    if (js_p->error_code)
  1783.        imm_p->saved_error_code = js_p->error_code;
  1784.  
  1785.    js_p->error_code = 0;
  1786.  
  1787.    if (!imm_p->handle_array_copies_local)
  1788.        return SM_ACTION_COMPLETE;
  1789.  
  1790.  
  1791.    ret = job_trove_dspace_remove_list( imm_p->fs_id
  1792.                                       ,imm_p->handle_array_copies_local
  1793.                                       ,NULL
  1794.                                       ,imm_p->handle_array_copies_local_count
  1795.                                       ,TROVE_SYNC
  1796.                                       ,smcb
  1797.                                       ,0
  1798.                                       ,js_p
  1799.                                       ,&job_id
  1800.                                       ,server_job_context
  1801.                                       ,NULL );
  1802.  
  1803.    return ret;
  1804. }/*end action remove_local_datahandle_objects*/
  1805.  
  1806.  
  1807.  
  1808.  
  1809. static PINT_sm_action cleanup (struct PINT_smcb *smcb, job_status_s *js_p)
  1810. {   
  1811.    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing cleanup....\n");
  1812.  
  1813.    struct PINT_server_op *sm_p = NULL;
  1814.    PINT_server_create_copies_op *imm_p = NULL;
  1815.    int i;
  1816.  
  1817.    sm_p = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
  1818.    imm_p = &(sm_p->u.create_copies);
  1819.  
  1820.    if (js_p->error_code == NOTHING_TO_DO)
  1821.        js_p->error_code = 0;
  1822.  
  1823.    if (imm_p->my_remote_servers)
  1824.        free(imm_p->my_remote_servers);
  1825.  
  1826.    if (imm_p->writes_completed)
  1827.        free(imm_p->writes_completed);
  1828.  
  1829.    if (imm_p->handle_array_copies_local)
  1830.        free(imm_p->handle_array_copies_local);
  1831.  
  1832.    if (imm_p->handle_array_copies_remote)
  1833.        free(imm_p->handle_array_copies_remote);
  1834.  
  1835.    if (imm_p->remote_io_servers)
  1836.    {
  1837.        for (i=0;i<imm_p->remote_io_servers_count;i++)
  1838.             free(imm_p->remote_io_servers[i]);
  1839.        free(imm_p->remote_io_servers);
  1840.    }
  1841.  
  1842.    if (imm_p->local_io_servers)
  1843.    {
  1844.       for (i=0; i<imm_p->local_io_servers_count; i++)
  1845.           free(imm_p->local_io_servers[i]);
  1846.       free(imm_p->local_io_servers);
  1847.    }
  1848.  
  1849.    if (imm_p->handle_array_base)
  1850.        free(imm_p->handle_array_base);
  1851.  
  1852.    if (imm_p->handle_array_base_local)
  1853.        free(imm_p->handle_array_base_local);
  1854.  
  1855.    if (imm_p->handle_array_copies)
  1856.        free(imm_p->handle_array_copies);
  1857.  
  1858.    if (imm_p->io_servers)
  1859.    {
  1860.       for (i=0;i<imm_p->io_servers_required;i++)
  1861.           free(imm_p->io_servers[i]);
  1862.       free(imm_p->io_servers);
  1863.    }
  1864.  
  1865.    if (imm_p->ds_attr_a)
  1866.       free(imm_p->ds_attr_a);
  1867.  
  1868.    if (imm_p->bstream_array_base_local)
  1869.       free(imm_p->bstream_array_base_local);
  1870.  
  1871.    if (!js_p->error_code && imm_p->saved_error_code)
  1872.       js_p->error_code = imm_p->saved_error_code;
  1873.  
  1874.    if (imm_p->dist && imm_p->dist->dist_name)
  1875.       free(imm_p->dist->dist_name);
  1876.    if (imm_p->dist)
  1877.       free(imm_p->dist);
  1878.  
  1879.    gossip_debug(GOSSIP_MIRROR_DEBUG,"Leaving cleanup: error_code:%d.....\n"
  1880.                                    ,js_p->error_code);
  1881.  
  1882.       
  1883.    return SM_ACTION_COMPLETE;
  1884. }/*end action cleanup*/
  1885.  
  1886.  
  1887. int mirror_comp_fn(void *v_p, struct PVFS_server_resp *resp_p, int i)
  1888. {
  1889.    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing mirror_comp_fn.....\n");
  1890.  
  1891.    PINT_smcb *smcb = v_p;
  1892.    struct PINT_server_op *mirror_op = 
  1893.                                    PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
  1894.    struct PVFS_servresp_mirror *respmir = &(mirror_op->resp.u.mirror); 
  1895.    int k;
  1896.  
  1897.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tmirror_op:%p\n",mirror_op);
  1898.  
  1899.    /* only posted one msgpair per source handle*/
  1900.    assert(i==0);
  1901.  
  1902.    /* If the response status is non-zero, then the rest of the response is  */
  1903.    /* NOT encoded in final-response.sm.  So, there are no values to access. */
  1904.    /* NOTE: An error code will be returned in the status field IFF NONE of  */
  1905.    /* the writes were successful.  Otherwise, the status of each write will */
  1906.    /* be contained in the write_status_code field.                          */ 
  1907.    if (resp_p->status != 0)
  1908.       return(resp_p->status);
  1909.  
  1910.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tresp->src_handle:%llu "
  1911.                                     "\tresp->src_server_nr:%d "
  1912.                                     "\tresp->status:%d\n"
  1913.                                    ,llu(resp_p->u.mirror.src_handle)
  1914.                                    ,resp_p->u.mirror.src_server_nr
  1915.                                    ,resp_p->status );
  1916.    for (k=0; k<resp_p->u.mirror.dst_count; k++)
  1917.    {
  1918.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\tresp->bytes_written[%d]:%d"
  1919.                                         "\tresp->write_status_code[%d]:%d\n"
  1920.                                        ,k
  1921.                                        ,resp_p->u.mirror.bytes_written[k]
  1922.                                        ,k
  1923.                                        ,resp_p->u.mirror.write_status_code[k]);
  1924.    }
  1925.  
  1926.    assert(mirror_op->op == PVFS_SERV_MIRROR);
  1927.  
  1928.    memset(&(mirror_op->resp),0,sizeof(mirror_op->resp));
  1929.  
  1930.    /*capture information from the mirror operation.*/
  1931.    respmir->src_handle    = resp_p->u.mirror.src_handle;
  1932.    respmir->src_server_nr = resp_p->u.mirror.src_server_nr;
  1933.    respmir->dst_count     = resp_p->u.mirror.dst_count;
  1934.  
  1935.    respmir->bytes_written = malloc(sizeof(uint32_t) * respmir->dst_count);
  1936.    if (!respmir->bytes_written)
  1937.    {
  1938.       gossip_lerr("Unable to allocate respmir->bytes_written\n");
  1939.       return (-PVFS_ENOMEM);
  1940.    }
  1941.    memset(respmir->bytes_written,0,sizeof(uint32_t) * respmir->dst_count);
  1942.  
  1943.    respmir->write_status_code = malloc(sizeof(uint32_t) * respmir->dst_count);
  1944.    if (!respmir->write_status_code)
  1945.    {
  1946.        gossip_lerr("Unable to allocate respmir->write_status_code.\n");
  1947.        return (-PVFS_ENOMEM);
  1948.    }
  1949.    memset(respmir->write_status_code,0,sizeof(uint32_t) * respmir->dst_count);
  1950.  
  1951.    memcpy(respmir->bytes_written,resp_p->u.mirror.bytes_written
  1952.          ,sizeof(uint32_t) * respmir->dst_count);
  1953.    memcpy(respmir->write_status_code,resp_p->u.mirror.write_status_code
  1954.          ,sizeof(uint32_t) * respmir->dst_count);
  1955.  
  1956.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tsmcb->base_frame:%d\tframe_count:%d\n"
  1957.                                    ,smcb->base_frame,smcb->frame_count);
  1958.  
  1959.    return(0);
  1960. } /*end msgpair completion function mirror_comp_fn*/
  1961.  
  1962.  
  1963.  
  1964.  
  1965. static PVFS_handle *reorganize_copy_handles(
  1966.                     struct PINT_server_create_copies_op *imm_p)
  1967. {
  1968.    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing reorganize_copy_handles..\n");
  1969.  
  1970.    uint64_t i,j,k,rows,in_cols,out_cols;
  1971.    PVFS_handle *copies_out = NULL;
  1972.    PVFS_handle *copies_in  = imm_p->handle_array_copies;
  1973.  
  1974.  
  1975.    rows     = imm_p->copies;
  1976.    in_cols  = imm_p->io_servers_required;
  1977.    out_cols = imm_p->dfile_count;
  1978.  
  1979.    /* allocate copies_out array */
  1980.    copies_out = malloc(sizeof(PVFS_handle) * rows * out_cols);
  1981.    if (!copies_out)
  1982.    {
  1983.       gossip_lerr("Unable to allocate memeory.\n");
  1984.       return (NULL);
  1985.    }
  1986.    memset(copies_out,0,sizeof(PVFS_handle) * rows * out_cols);
  1987.  
  1988.    for (i=0; i<(in_cols*rows); i++)
  1989.    {
  1990.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\thandle_array_copies(%d):%llu\n"
  1991.                                        ,(int)i
  1992.                                        ,llu(copies_in[i]));
  1993.    }
  1994.  
  1995.    /*this code copies copies_in[n+1] to copies_out[n] within the same row*/
  1996.    /*each row represents one copy of the logical file, i.e., each of its */
  1997.    /*datahandles.                                                        */
  1998.    for (i=0,k=1; i<rows; i++,k++)
  1999.    {
  2000.        for (j=0; j<out_cols; j++)
  2001.        {
  2002.            copies_out[(i*out_cols)+j] = copies_in[(i*in_cols)+((j+k)%in_cols)];
  2003.        }
  2004.    }
  2005.    
  2006.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tReorg'd Handles Array:\n");
  2007.    for (i=0; i<rows; i++)
  2008.    {
  2009.        for (j=0; j<out_cols; j++)
  2010.        {
  2011.            gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\t(%d,%d):%llu\n"
  2012.                                 ,(int)i,(int)j
  2013.                                 ,llu(copies_out[(i*out_cols)+j]));
  2014.        }
  2015.    }
  2016.  
  2017.    return(copies_out);
  2018. }/*end function reorganize_copy_handles*/
  2019.  
  2020. static int get_server_names(PINT_server_create_copies_op *imm_p)
  2021. {
  2022.   char **list=NULL;
  2023.   int size=0
  2024.      ,i,j,ret;
  2025.  
  2026.   gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing get_server_names....\n");
  2027.  
  2028.   gossip_debug(GOSSIP_MIRROR_DEBUG,    "\tio_servers(before):\n");
  2029.   for (i=0; i<imm_p->io_servers_required; i++)
  2030.       gossip_debug(GOSSIP_MIRROR_DEBUG,"\t              [%d]:%s"
  2031.                                        "\tlength:%d\n"
  2032.                                       ,i
  2033.                                       ,imm_p->io_servers[i]
  2034.                                       ,(int)strlen(imm_p->io_servers[i]));
  2035.  
  2036.  
  2037.  
  2038.   /*Get access to the io server names residing in the cache*/
  2039.   ret = PINT_cached_config_io_server_names(&list,&size,imm_p->fs_id);
  2040.   if (ret)
  2041.   {
  2042.       if (list)
  2043.          free(list);
  2044.       gossip_lerr("Unable to retrieve IO server names from the cache.\n");
  2045.       return(ret);
  2046.   }
  2047.  
  2048.   gossip_debug(GOSSIP_MIRROR_DEBUG,"\tReturned from PINT_cached_config...\n");
  2049.   for (i=0; i<size; i++)
  2050.     gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\tValue of      size:%d\n"
  2051.                                      "\t\t\t\tValue of   list[%d]:%p\n"
  2052.                                      "\t\t\t\tValue of  *list[%d]:%s\n"
  2053.                                      "\t\t\t\tstrlen of  list[%d]:%d\n\n"
  2054.                                     ,size
  2055.                                     ,i,list[i],i,list[i]
  2056.                                     ,i,(int)strlen(list[i]));
  2057.  
  2058.   /*Remove server names that are already being used in the io_servers list*/
  2059.   for (i=0; i<size; i++)
  2060.   {
  2061.      for (j=0; j<imm_p->dfile_count; j++)
  2062.      {
  2063.         if (strncmp(list[i],imm_p->io_servers[j],strlen(list[i])) == 0)
  2064.         {
  2065.          list[i] = NULL;
  2066.          break;
  2067.         }
  2068.      }/*end for*/
  2069.   }/*end for*/
  2070.  
  2071.   /*Add server names to io_servers list*/
  2072.   for (i=0,j=imm_p->dfile_count; i<size && j<imm_p->io_servers_required; i++)
  2073.   {
  2074.      if (list[i])
  2075.      {
  2076.         strncpy(imm_p->io_servers[j],list[i],SERVER_NAME_MAX-1);
  2077.         j++;
  2078.      }
  2079.   }/*end for*/
  2080.  
  2081.   gossip_debug(GOSSIP_MIRROR_DEBUG,    "\tio_servers(after):\n");
  2082.   for (i=0; i<imm_p->io_servers_required; i++)
  2083.       gossip_debug(GOSSIP_MIRROR_DEBUG,"\t             [%d]:%s\n"
  2084.                                       ,i
  2085.                                       ,imm_p->io_servers[i]);
  2086.  
  2087.   /*deallocate memory used for "list"*/
  2088.   free(list);
  2089.  
  2090.   return (0);
  2091. }/*end function get_server_names*/
  2092.  
  2093. /******************************************************************************/
  2094. /* Right now, this state machine is not called as a standalone request. It is */
  2095. /* only called as a nested machine from seteattr; however, when time comes to */
  2096. /* create a standalone server request, the values used for the request        */
  2097. /* parameters are listed below.                                               */
  2098. /******************************************************************************/
  2099. static inline int PINT_get_object_ref_copies( struct PVFS_server_req *req
  2100.                                              ,PVFS_fs_id *fs_id
  2101.                                              ,PVFS_handle *handle )
  2102. {
  2103.    *fs_id  = req->u.seteattr.fs_id;
  2104.    *handle = req->u.seteattr.handle;     
  2105.  
  2106.     return 0;
  2107. };
  2108.  
  2109. /*request parameters*/
  2110. struct PINT_server_req_params pvfs2_create_immutable_copies_params =
  2111. {
  2112.     .string_name = "create_immutable_copies",
  2113.     .perm = PINT_SERVER_CHECK_NONE,
  2114.     .access_type = PINT_server_req_modify,
  2115.     .sched_policy = PINT_SERVER_REQ_SCHEDULE,
  2116.     .get_object_ref = PINT_get_object_ref_copies,
  2117.     .state_machine = &pvfs2_create_immutable_copies_sm
  2118. };
  2119.  
  2120. /****** E N D  O F  F I L E *******************************/
  2121.