home *** CD-ROM | disk | FTP | other *** search
/ ftp.parl.clemson.edu / 2015-02-07.ftp.parl.clemson.edu.tar / ftp.parl.clemson.edu / pub / pvfs2 / orangefs-2.8.3-20110323.tar.gz / orangefs-2.8.3-20110323.tar / orangefs / src / client / sysint / sys-readdirplus.sm < prev    next >
Text File  |  2009-09-03  |  36KB  |  969 lines

  1. /* 
  2.  * (C) 2003 Clemson University and The University of Chicago 
  3.  *
  4.  * See COPYING in top-level directory.
  5.  */
  6.  
  7. /** \file
  8.  *  \ingroup sysint
  9.  *
  10.  *  PVFS2 system interface routines for reading entries from a directory
  11.  *  and also filling in the attribute information for each entry.
  12.  *  First step involves fetching all directory entries and their associated meta
  13.  *  handles, data file handles from the server responsible for the directory.
  14.  *  Second step involves sending requests to all servers to fetch attributes (dfile/meta handle)
  15.  *  in parallel.
  16.  */
  17.  
  18. #include <string.h>
  19. #include <assert.h>
  20.  
  21. #include "client-state-machine.h"
  22. #include "pvfs2-debug.h"
  23. #include "job.h"
  24. #include "gossip.h"
  25. #include "str-utils.h"
  26. #include "pint-cached-config.h"
  27. #include "PINT-reqproto-encode.h"
  28. #include "ncache.h"
  29. #include "pint-util.h"
  30. #include "pvfs2-internal.h"
  31.  
  32. enum {
  33.     NO_WORK = 1
  34. };
  35.  
  36. extern job_context_id pint_client_sm_context;
  37.  
  38. static int readdirplus_fetch_attrs_comp_fn(void *v_p,
  39.                                struct PVFS_server_resp *resp_p,
  40.                                int index);
  41.  
  42. static int readdirplus_fetch_sizes_comp_fn(void *v_p,
  43.                                struct PVFS_server_resp *resp_p,
  44.                                int index);
  45.  
  46. %%
  47.  
  48. machine pvfs2_client_readdirplus_sm
  49. {
  50.     state init
  51.     {
  52.         jump pvfs2_client_readdir_sm;
  53.         success => readdirplus_fetch_attrs_setup_msgpair;
  54.         default => readdirplus_msg_failure;
  55.     }
  56.  
  57.     state readdirplus_fetch_attrs_setup_msgpair
  58.     {
  59.         run readdirplus_fetch_attrs_setup_msgpair;
  60.         NO_WORK => cleanup;
  61.         success => readdirplus_fetch_attrs_xfer_msgpair;
  62.         default => readdirplus_msg_failure;
  63.     }
  64.  
  65.     state readdirplus_fetch_attrs_xfer_msgpair
  66.     {
  67.         jump pvfs2_msgpairarray_sm;
  68.         success => readdirplus_fetch_sizes_setup_msgpair;
  69.         default => readdirplus_msg_failure;
  70.     }
  71.  
  72.     state readdirplus_fetch_sizes_setup_msgpair
  73.     {
  74.         run readdirplus_fetch_sizes_setup_msgpair;
  75.         NO_WORK => cleanup;
  76.         success => readdirplus_fetch_sizes_xfer_msgpair;
  77.         default => readdirplus_msg_failure;
  78.     }
  79.  
  80.     state readdirplus_fetch_sizes_xfer_msgpair
  81.     {
  82.         jump pvfs2_msgpairarray_sm;
  83.         success => cleanup;
  84.         default => readdirplus_msg_failure;
  85.     }
  86.  
  87.     state readdirplus_msg_failure
  88.     {
  89.         run readdirplus_msg_failure;
  90.         default => cleanup;
  91.     }
  92.  
  93.     state cleanup
  94.     {
  95.         run readdirplus_cleanup;
  96.         default => terminate;
  97.     }
  98. }
  99.  
  100. %%
  101.  
  102. /** Initiate reading of entries from a directory and their associated attributes.
  103.  *
  104.  *  \param token opaque value used to track position in directory
  105.  *         when more than one read is required.
  106.  *  \param pvfs_dirent_incount maximum number of entries to read, if
  107.  *         available, starting from token.
  108.  */
  109. PVFS_error PVFS_isys_readdirplus(
  110.     PVFS_object_ref ref,
  111.     PVFS_ds_position token, 
  112.     int32_t pvfs_dirent_incount,
  113.     const PVFS_credentials *credentials,
  114.     uint32_t attrmask,
  115.     PVFS_sysresp_readdirplus *resp,
  116.     PVFS_sys_op_id *op_id,
  117.     PVFS_hint hints,
  118.     void *user_ptr)
  119. {
  120.     PVFS_error ret = -PVFS_EINVAL;
  121.     PINT_client_sm *sm_p = NULL;
  122.     PINT_smcb *smcb = NULL;
  123.  
  124.     gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_isys_readdirplus entered\n");
  125.  
  126.     if ((ref.handle == PVFS_HANDLE_NULL) ||
  127.         (ref.fs_id == PVFS_FS_ID_NULL) ||
  128.         (resp == NULL))
  129.     {
  130.         gossip_err("invalid (NULL) required argument\n");
  131.         return ret;
  132.     }
  133.  
  134.     if (pvfs_dirent_incount > PVFS_REQ_LIMIT_DIRENT_COUNT)
  135.     {
  136.         gossip_lerr("PVFS_isys_readdirplus unable to handle request "
  137.                     "for %d entries.\n", pvfs_dirent_incount);
  138.         return ret;
  139.     }
  140.  
  141.     PINT_smcb_alloc(&smcb, PVFS_SYS_READDIRPLUS,
  142.              sizeof(struct PINT_client_sm),
  143.              client_op_state_get_machine,
  144.              client_state_machine_terminate,
  145.              pint_client_sm_context);
  146.     if (smcb == NULL)
  147.     {
  148.         return -PVFS_ENOMEM;
  149.     }
  150.     sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  151.  
  152.     PINT_init_msgarray_params(sm_p, ref.fs_id);
  153.     PINT_init_sysint_credentials(sm_p->cred_p, credentials);
  154.     sm_p->object_ref = ref;
  155.     PVFS_hint_copy(hints, &sm_p->hints);
  156.     /* point the sm dirent array and outcount to the readdirplus response field */
  157.     sm_p->readdir.dirent_array = &resp->dirent_array;
  158.     sm_p->readdir.dirent_outcount = &resp->pvfs_dirent_outcount;
  159.     sm_p->readdir.token = &resp->token;
  160.     sm_p->readdir.directory_version = &resp->directory_version;
  161.  
  162.     sm_p->readdir.pos_token = sm_p->u.readdirplus.pos_token = token;
  163.     sm_p->readdir.dirent_limit = sm_p->u.readdirplus.dirent_limit = pvfs_dirent_incount;
  164.     /* We store the object attr mask in the sm structure */
  165.     sm_p->u.readdirplus.attrmask = PVFS_util_sys_to_object_attr_mask(attrmask);
  166.     sm_p->u.readdirplus.readdirplus_resp = resp;
  167.     sm_p->u.readdirplus.svr_count = 0;
  168.     sm_p->u.readdirplus.size_array = NULL;
  169.     sm_p->u.readdirplus.nhandles = 0;
  170.     sm_p->u.readdirplus.obj_attr_array = NULL;
  171.     sm_p->u.readdirplus.server_addresses = NULL;
  172.     sm_p->u.readdirplus.handle_count = NULL;
  173.     sm_p->u.readdirplus.handles = NULL;
  174.  
  175.     gossip_debug(GOSSIP_READDIR_DEBUG, "Doing readdirplus on handle "
  176.                  "%llu on fs %d\n", llu(ref.handle), ref.fs_id);
  177.  
  178.     return PINT_client_state_machine_post(
  179.         smcb, op_id, user_ptr);
  180. }
  181.  
  182. /** Read entries from a directory and their associated attributes
  183.  *  in an efficient manner.
  184.  *
  185.  *  \param token opaque value used to track position in directory
  186.  *         when more than one read is required.
  187.  *  \param pvfs_dirent_incount maximum number of entries to read, if
  188.  *         available, starting from token.
  189.  */
  190. PVFS_error PVFS_sys_readdirplus(
  191.     PVFS_object_ref ref,
  192.     PVFS_ds_position token, 
  193.     int32_t pvfs_dirent_incount,
  194.     const PVFS_credentials *credentials,
  195.     uint32_t attrmask,
  196.     PVFS_sysresp_readdirplus *resp,
  197.     PVFS_hint hints)
  198. {
  199.     PVFS_error ret = -PVFS_EINVAL, error = 0;
  200.     PVFS_sys_op_id op_id;
  201.  
  202.     gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_sys_readdirplus entered\n");
  203.  
  204.     ret = PVFS_isys_readdirplus(ref, token, pvfs_dirent_incount,
  205.                             credentials, attrmask, resp, &op_id, NULL, hints);
  206.     if (ret)
  207.     {
  208.         PVFS_perror_gossip("PVFS_isys_readdirplus call", ret);
  209.         error = ret;
  210.     }
  211.     else
  212.     {
  213.         ret = PVFS_sys_wait(op_id, "readdirplus", &error);
  214.         if (ret)
  215.         {
  216.             PVFS_perror_gossip("PVFS_sys_wait call", ret);
  217.             error = ret;
  218.         }
  219.     }
  220.  
  221.     PINT_sys_release(op_id);
  222.     return error;
  223. }
  224.  
  225. /****************************************************************/
  226.  
  227. static int get_handle_index(struct handle_to_index *input_handle_array, int nhandles, PVFS_handle given_handle, int *primary_index, int *secondary_index)
  228. {
  229.     int i;
  230.  
  231.     for (i = 0; i < nhandles; i++) {
  232.         if (input_handle_array[i].handle == given_handle) {
  233.             if (primary_index)
  234.                 *primary_index = input_handle_array[i].handle_index;
  235.             if (secondary_index)
  236.                 *secondary_index = input_handle_array[i].aux_index;
  237.             return 0;
  238.         }
  239.     }
  240.     if (primary_index) 
  241.         *primary_index = -1;
  242.     if (secondary_index)
  243.         *secondary_index = -1;
  244.  
  245.     return -1;
  246. }
  247.  
  248. static int is_unique_server(PVFS_BMI_addr_t svr_addr, int svr_count, 
  249.     PVFS_BMI_addr_t *svr_addr_array, int *svr_index)
  250. {
  251.     int i, ret;
  252.  
  253.     ret = 1;
  254.     if (svr_count == 0 || svr_addr_array == NULL) {
  255.         goto out;
  256.     }
  257.     for (i = 0; i < svr_count; i++) {
  258.         if (svr_addr_array[i] == svr_addr) {
  259.             if (svr_index)
  260.                 *svr_index = i;
  261.             ret = 0;
  262.             goto out;
  263.         }
  264.     }
  265. out:
  266.     return ret;
  267. }
  268.  
  269. static int destroy_partition_handles(int *svr_count,
  270.                              PVFS_BMI_addr_t **svr_addr_array,
  271.                              int **per_server_handle_count,
  272.                              PVFS_handle ***per_server_handles)
  273. {
  274.     int i;
  275.     if (*svr_addr_array)
  276.     {
  277.         free(*svr_addr_array);
  278.         *svr_addr_array = NULL;
  279.     }
  280.     if (*per_server_handles && *per_server_handle_count)
  281.     {
  282.         for (i = 0; i < (*svr_count); i++) {
  283.             if ((*per_server_handles)[i]) {
  284.                 free((*per_server_handles)[i]);
  285.                 (*per_server_handles)[i] = NULL;
  286.             }
  287.         }
  288.     }
  289.     if (*per_server_handles) {
  290.         free(*per_server_handles);
  291.         *per_server_handles = NULL;
  292.     }
  293.     if (*per_server_handle_count) {
  294.         free(*per_server_handle_count);
  295.         *per_server_handle_count = NULL;
  296.     }
  297.     (*svr_count) = 0;
  298.     return 0;
  299. }
  300.  
  301. static int create_partition_handles(PVFS_fs_id fsid, int input_handle_count, 
  302.                              struct handle_to_index *input_handle_array,
  303.                              int *svr_count, PVFS_BMI_addr_t **svr_addr_array,
  304.                              int **per_server_handle_count,
  305.                              PVFS_handle ***per_server_handles)
  306. {
  307.     int i, err = 0;
  308.     PVFS_BMI_addr_t tmp_svr_addr;
  309.  
  310.     *svr_count = 0;
  311.     *svr_addr_array = NULL;
  312.     *per_server_handle_count = NULL;
  313.     *per_server_handles =  NULL;
  314.  
  315.     do {
  316.         for (i = 0; i < input_handle_count; i++) 
  317.         {
  318.             int ret;
  319.             ret = PINT_cached_config_map_to_server(&tmp_svr_addr,
  320.                 input_handle_array[i].handle, fsid);
  321.             if (ret)
  322.             {
  323.                 gossip_err("Failed to map server address\n");
  324.                 err = ret;
  325.                 break;
  326.             }
  327.             /* unique server address */
  328.             if (is_unique_server(tmp_svr_addr, *svr_count,
  329.                 *svr_addr_array, NULL) == 1) 
  330.             {
  331.                 (*svr_count)++;
  332.                 *svr_addr_array = (PVFS_BMI_addr_t *) 
  333.                     realloc(*svr_addr_array, (*svr_count) * sizeof(PVFS_BMI_addr_t));
  334.                 if (*svr_addr_array == NULL)
  335.                 {
  336.                     gossip_err("Could not allocate server address\n");
  337.                     err = -PVFS_ENOMEM;
  338.                     break;
  339.                 }
  340.                 (*svr_addr_array)[(*svr_count) - 1] = tmp_svr_addr;
  341.             }
  342.         }
  343.         if (i != input_handle_count) {
  344.             break;
  345.         }
  346.         *per_server_handle_count = 
  347.             (int *) calloc(*svr_count, sizeof(int));
  348.         if (*per_server_handle_count == NULL) {
  349.             err = -PVFS_ENOMEM;
  350.             break;
  351.         }
  352.         *per_server_handles = (PVFS_handle **) 
  353.             calloc(*svr_count, sizeof(PVFS_handle *));
  354.         if (*per_server_handles == NULL) {
  355.             err = -PVFS_ENOMEM;
  356.             break;
  357.         }
  358.         for (i = 0; i < input_handle_count; i++) 
  359.         {
  360.             int svr_index = 0;
  361.             int ret;
  362.             ret = PINT_cached_config_map_to_server(&tmp_svr_addr,
  363.                 input_handle_array[i].handle, fsid);
  364.             /* unique server address to find index */
  365.             is_unique_server(tmp_svr_addr, *svr_count,
  366.                 *svr_addr_array, &svr_index); 
  367.             (*per_server_handle_count)[svr_index]++;
  368.             (*per_server_handles)[svr_index] = (PVFS_handle *) 
  369.                 realloc((*per_server_handles)[svr_index], 
  370.                     (*per_server_handle_count)[svr_index] * sizeof(PVFS_handle));
  371.             if ((*per_server_handles)[svr_index] == NULL) {
  372.                 err = -PVFS_ENOMEM;
  373.                 break;
  374.             }
  375.             (*per_server_handles)[svr_index][(*per_server_handle_count)[svr_index] - 1] = input_handle_array[i].handle;
  376.         }
  377.         if (i != input_handle_count)
  378.             break;
  379.     } while (0);
  380.     return err;
  381. }
  382.  
  383. /* figure out which meta servers need to be contacted */
  384. static int list_of_meta_servers(PINT_client_sm *sm_p)
  385. {
  386.     PVFS_sysresp_readdirplus *readdirplus_resp = sm_p->u.readdirplus.readdirplus_resp;
  387.     int i, ret, err_array_len, attr_array_len;
  388.  
  389.     assert(readdirplus_resp);
  390.     err_array_len = (sizeof(PVFS_error) *
  391.          readdirplus_resp->pvfs_dirent_outcount);
  392.     attr_array_len =
  393.         (sizeof(PVFS_sys_attr) *
  394.          readdirplus_resp->pvfs_dirent_outcount);
  395.  
  396.     /* This stat_err_array MUST be freed by caller */
  397.     readdirplus_resp->stat_err_array =
  398.         (PVFS_error *) calloc(err_array_len, 1);
  399.     if (readdirplus_resp->stat_err_array == NULL)
  400.     {
  401.         return -PVFS_ENOMEM;
  402.     }
  403.     readdirplus_resp->attr_array =
  404.         (PVFS_sys_attr *) calloc(attr_array_len, 1);
  405.     if (readdirplus_resp->attr_array == NULL)
  406.     {
  407.         free(readdirplus_resp->stat_err_array);
  408.         readdirplus_resp->stat_err_array = NULL;
  409.         return -PVFS_ENOMEM;
  410.     }
  411.  
  412.     sm_p->u.readdirplus.svr_count = 0;
  413.     sm_p->u.readdirplus.server_addresses = NULL;
  414.     sm_p->u.readdirplus.handles = NULL;
  415.     sm_p->u.readdirplus.handle_count = NULL;
  416.     sm_p->u.readdirplus.nhandles = readdirplus_resp->pvfs_dirent_outcount;
  417.     sm_p->u.readdirplus.input_handle_array = (struct handle_to_index *) 
  418.         calloc(sm_p->u.readdirplus.nhandles, sizeof(struct handle_to_index));
  419.     if (sm_p->u.readdirplus.input_handle_array == NULL) 
  420.     {
  421.         free(readdirplus_resp->attr_array);
  422.         readdirplus_resp->attr_array = NULL;
  423.         free(readdirplus_resp->stat_err_array);
  424.         readdirplus_resp->stat_err_array = NULL;
  425.         return -PVFS_ENOMEM;
  426.     }
  427.     sm_p->u.readdirplus.obj_attr_array = (PVFS_object_attr *)
  428.         calloc(sm_p->u.readdirplus.nhandles, sizeof(PVFS_object_attr));
  429.     if (sm_p->u.readdirplus.obj_attr_array == NULL) 
  430.     {
  431.         free(readdirplus_resp->attr_array);
  432.         readdirplus_resp->attr_array = NULL;
  433.         free(readdirplus_resp->stat_err_array);
  434.         readdirplus_resp->stat_err_array = NULL;
  435.         return -PVFS_ENOMEM;
  436.     }
  437.     sm_p->u.readdirplus.size_array = (PVFS_size **)
  438.         calloc(sm_p->u.readdirplus.nhandles, sizeof(PVFS_size *));
  439.     if (sm_p->u.readdirplus.size_array == NULL)
  440.     {
  441.         free(readdirplus_resp->attr_array);
  442.         readdirplus_resp->attr_array = NULL;
  443.         free(readdirplus_resp->stat_err_array);
  444.         readdirplus_resp->stat_err_array = NULL;
  445.         return -PVFS_ENOMEM;
  446.     }
  447.  
  448.     for (i = 0; i < sm_p->u.readdirplus.nhandles; i++)
  449.     {
  450.         sm_p->u.readdirplus.input_handle_array[i].handle = 
  451.                 readdirplus_resp->dirent_array[i].handle;
  452.         sm_p->u.readdirplus.input_handle_array[i].handle_index = i;
  453.         /* aux index is not used for meta handles */
  454.         sm_p->u.readdirplus.input_handle_array[i].aux_index = -1;
  455.     }
  456.     ret = create_partition_handles(sm_p->object_ref.fs_id,
  457.                             sm_p->u.readdirplus.nhandles,
  458.                             sm_p->u.readdirplus.input_handle_array,
  459.                             &sm_p->u.readdirplus.svr_count, /* number of servers */
  460.                             &sm_p->u.readdirplus.server_addresses, /* array of server addresses */
  461.                             &sm_p->u.readdirplus.handle_count, /* array of counts of handles to each server */
  462.                             &sm_p->u.readdirplus.handles); /* actual per-server handle array */
  463.     return ret;
  464. }
  465.  
  466. /* Setup phase 1 stuff */
  467. static PINT_sm_action readdirplus_fetch_attrs_setup_msgpair(struct PINT_smcb *smcb,
  468.                                job_status_s *js_p)
  469. {
  470.     int i, ret;
  471.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  472.     PINT_sm_msgpair_state *msg_p = NULL;
  473.  
  474.     gossip_debug(GOSSIP_CLIENT_DEBUG, "readdirplus state: fetch_attrs_setup\n");
  475.     /* if there are no dirents then return NO_WORK */
  476.     if (sm_p->u.readdirplus.readdirplus_resp->pvfs_dirent_outcount == 0)
  477.     {
  478.         gossip_debug(GOSSIP_CLIENT_DEBUG, "readdirplus: no dirent to read; return\n");
  479.         js_p->error_code = NO_WORK;
  480.         return SM_ACTION_COMPLETE;
  481.     }
  482.  
  483.     /* From the readdirplus structure figure out which meta servers
  484.      * we need to speak to to get the attribute information
  485.      */
  486.      if ((ret = list_of_meta_servers(sm_p)) < 0) 
  487.      {
  488.          gossip_err("Could not locate list of attribute servers %d\n", ret);
  489.          js_p->error_code = ret;
  490.          return SM_ACTION_COMPLETE;
  491.      }
  492.      if (sm_p->u.readdirplus.svr_count == 0)
  493.      {
  494.          gossip_err("Number of meta servers to contact cannot be 0 %d\n", -PVFS_EINVAL);
  495.          js_p->error_code = -PVFS_EINVAL;
  496.          return SM_ACTION_COMPLETE;
  497.      }
  498.      ret = PINT_msgpairarray_init(
  499.          &sm_p->msgarray_op, sm_p->u.readdirplus.svr_count);
  500.      if(ret != 0)
  501.      {
  502.          gossip_err("Failed to initialize %d msgpairs\n",
  503.                     sm_p->u.readdirplus.svr_count);
  504.          js_p->error_code = ret;
  505.          return SM_ACTION_COMPLETE;
  506.      }
  507.  
  508.      foreach_msgpair(&sm_p->msgarray_op, msg_p, i)
  509.      {
  510.         PINT_SERVREQ_LISTATTR_FILL(
  511.             msg_p->req,
  512.             *sm_p->cred_p,
  513.             sm_p->object_ref.fs_id,
  514.             sm_p->u.readdirplus.attrmask,
  515.             sm_p->u.readdirplus.handle_count[i],
  516.             sm_p->u.readdirplus.handles[i],
  517.             sm_p->hints);
  518.         msg_p->fs_id = sm_p->object_ref.fs_id;
  519.         msg_p->handle = PVFS_HANDLE_NULL;
  520.         msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
  521.         msg_p->comp_fn = readdirplus_fetch_attrs_comp_fn;
  522.         msg_p->svr_addr = sm_p->u.readdirplus.server_addresses[i];
  523.      }
  524.      /* immediate return. next state jumps to msgpairarray machine */
  525.      js_p->error_code = 0;
  526.      PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
  527.      return SM_ACTION_COMPLETE;
  528. }
  529.  
  530. /* Phase 1 completion callback */
  531. static int readdirplus_fetch_attrs_comp_fn(void *v_p,
  532.                                struct PVFS_server_resp *resp_p,
  533.                                int index)
  534. {
  535.     PINT_smcb *smcb = v_p;
  536.     PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
  537.  
  538.     gossip_debug(GOSSIP_LISTATTR_DEBUG,
  539.                  "readdirplus_fetch_attrs_comp_fn called\n");
  540.     assert(resp_p->op == PVFS_SERV_LISTATTR);
  541.  
  542.     /* Mark all handles in this server range as having failed a stat */
  543.     if (sm_p->msgarray_op.msgarray[index].op_status != 0) {
  544.         int i, handle_index;
  545.         for (i = 0; i < sm_p->u.readdirplus.handle_count[index]; i++) {
  546.             get_handle_index(sm_p->u.readdirplus.input_handle_array,
  547.                             sm_p->u.readdirplus.nhandles,
  548.                             sm_p->u.readdirplus.handles[index][i],
  549.                             &handle_index,
  550.                             NULL);
  551.             assert(handle_index >= 0);
  552.             sm_p->u.readdirplus.readdirplus_resp->stat_err_array[handle_index] =
  553.                             sm_p->msgarray_op.msgarray[index].op_status;
  554.         }
  555.     }
  556.     else if (sm_p->msgarray_op.msgarray[index].op_status == 0)
  557.     {
  558.         /* fetch all errors from the servresp structure and copy the object attributes */
  559.         int i, handle_index;
  560.  
  561.         /* make sure that we get back responses for all handles that we sent out */
  562.         assert(resp_p->u.listattr.nhandles == sm_p->u.readdirplus.handle_count[index]);
  563.         for (i = 0; i < sm_p->u.readdirplus.handle_count[index]; i++) 
  564.         {
  565.             get_handle_index(sm_p->u.readdirplus.input_handle_array,
  566.                             sm_p->u.readdirplus.nhandles,
  567.                             sm_p->u.readdirplus.handles[index][i],
  568.                             &handle_index,
  569.                             NULL);
  570.             assert(handle_index >= 0);
  571.             /* Copy any errors */
  572.             sm_p->u.readdirplus.readdirplus_resp->stat_err_array[handle_index] =
  573.                             resp_p->u.listattr.error[i];
  574.             if (resp_p->u.listattr.error[i] == 0)
  575.             {
  576.                 /* if no errors, stash the object attributes */
  577.                 PINT_copy_object_attr(&sm_p->u.readdirplus.obj_attr_array[handle_index], 
  578.                                       &resp_p->u.listattr.attr[i]);
  579.             }
  580.         }
  581.     }
  582.  
  583.     /* if this is the last response, check all the status values
  584.        and return error codes if any requests failed
  585.      */
  586.     if (index == (sm_p->msgarray_op.count - 1))
  587.     {
  588.         int i;
  589.         for (i = 0; i < sm_p->msgarray_op.count; i++) 
  590.         {
  591.             if (sm_p->msgarray_op.msgarray[i].op_status != 0)
  592.             {
  593.                 return sm_p->msgarray_op.msgarray[i].op_status;
  594.             }
  595.         }
  596.         /* destroy scratch space.. we need to reuse them in phase 2 */
  597.         destroy_partition_handles(&sm_p->u.readdirplus.svr_count, 
  598.                            &sm_p->u.readdirplus.server_addresses,
  599.                            &sm_p->u.readdirplus.handle_count,
  600.                            &sm_p->u.readdirplus.handles);
  601.         free(sm_p->u.readdirplus.input_handle_array);
  602.         sm_p->u.readdirplus.input_handle_array = NULL;
  603.         sm_p->u.readdirplus.nhandles = 0;
  604.     }
  605.     return 0;
  606. }
  607.  
  608. /* figure out which data servers need to be contacted */
  609. static int list_of_data_servers(PINT_client_sm *sm_p)
  610. {
  611.     int i, ret, nhandles;
  612.  
  613.     sm_p->u.readdirplus.svr_count = 0;
  614.     sm_p->u.readdirplus.server_addresses = NULL;
  615.     sm_p->u.readdirplus.handles = NULL;
  616.     sm_p->u.readdirplus.handle_count = NULL;
  617.     /* Go thru the list of handles and find out which ones are regular files
  618.      * and send out messages to servers for the sizes of the dfile handles 
  619.      */
  620.      nhandles = 0;
  621.      for (i = 0; i < sm_p->u.readdirplus.readdirplus_resp->pvfs_dirent_outcount; i++) 
  622.      {
  623.         /* skip if the file is stuffed */
  624.          if (sm_p->u.readdirplus.obj_attr_array[i].objtype == PVFS_TYPE_METAFILE && (sm_p->u.readdirplus.obj_attr_array[i].mask & PVFS_ATTR_META_UNSTUFFED))
  625.          {
  626.              if (sm_p->u.readdirplus.attrmask & PVFS_ATTR_META_ALL)
  627.              {
  628.                  assert(sm_p->u.readdirplus.obj_attr_array[i].mask & PVFS_ATTR_META_ALL);
  629.              }
  630.              nhandles += sm_p->u.readdirplus.obj_attr_array[i].u.meta.dfile_count;
  631.              /* Allocate size_array here */
  632.              sm_p->u.readdirplus.size_array[i] = (PVFS_size *)
  633.                 calloc(sm_p->u.readdirplus.obj_attr_array[i].u.meta.dfile_count, sizeof(PVFS_size));
  634.             if (sm_p->u.readdirplus.size_array[i] == NULL) 
  635.             {
  636.                 return -PVFS_ENOMEM;
  637.             }
  638.          }
  639.      }
  640.      /* no meta files */
  641.      if (nhandles == 0)
  642.          return 0;
  643.      sm_p->u.readdirplus.nhandles = nhandles;
  644.      sm_p->u.readdirplus.input_handle_array = (struct handle_to_index *)
  645.         calloc(nhandles, sizeof(struct handle_to_index));
  646.      if (sm_p->u.readdirplus.input_handle_array == NULL)
  647.      {
  648.          return -PVFS_ENOMEM;
  649.      }
  650.      nhandles = 0;
  651.      for (i = 0; i < sm_p->u.readdirplus.readdirplus_resp->pvfs_dirent_outcount; i++) 
  652.      {
  653.         /* skip if the file is stuffed */
  654.          if (sm_p->u.readdirplus.obj_attr_array[i].objtype == PVFS_TYPE_METAFILE && (sm_p->u.readdirplus.obj_attr_array[i].mask & PVFS_ATTR_META_UNSTUFFED))
  655.          {
  656.              int j;
  657.              if (sm_p->u.readdirplus.attrmask & PVFS_ATTR_META_DIST)
  658.              {
  659.                  assert(sm_p->u.readdirplus.obj_attr_array[i].mask & PVFS_ATTR_META_DIST);
  660.                  assert(sm_p->u.readdirplus.obj_attr_array[i].u.meta.dist);
  661.                  assert(sm_p->u.readdirplus.obj_attr_array[i].u.meta.dist_size > 0);
  662.              }
  663.              if (sm_p->u.readdirplus.attrmask & PVFS_ATTR_META_DFILES)
  664.              {
  665.                  assert(sm_p->u.readdirplus.obj_attr_array[i].mask & PVFS_ATTR_META_DFILES);
  666.                  assert(sm_p->u.readdirplus.obj_attr_array[i].u.meta.dfile_array);
  667.                  assert(sm_p->u.readdirplus.obj_attr_array[i].u.meta.dfile_count > 0);
  668.              }
  669.              for (j = 0; j < sm_p->u.readdirplus.obj_attr_array[i].u.meta.dfile_count; j++) 
  670.              {
  671.                  sm_p->u.readdirplus.input_handle_array[nhandles].handle = 
  672.                     sm_p->u.readdirplus.obj_attr_array[i].u.meta.dfile_array[j];
  673.                  sm_p->u.readdirplus.input_handle_array[nhandles].handle_index = i;
  674.                  sm_p->u.readdirplus.input_handle_array[nhandles].aux_index = j;
  675.                  nhandles++;
  676.              }
  677.          }
  678.      }
  679.      ret = create_partition_handles(sm_p->object_ref.fs_id,
  680.                             sm_p->u.readdirplus.nhandles,
  681.                             sm_p->u.readdirplus.input_handle_array,
  682.                             &sm_p->u.readdirplus.svr_count, /* number of servers */
  683.                             &sm_p->u.readdirplus.server_addresses, /* array of server addresses */
  684.                             &sm_p->u.readdirplus.handle_count, /* array of counts of handles to each server */
  685.                             &sm_p->u.readdirplus.handles); /* actual per-server handle array */
  686.  
  687.     return ret;
  688. }
  689.  
  690. /* Setup phase 2 stuff */
  691. static PINT_sm_action readdirplus_fetch_sizes_setup_msgpair(
  692.         struct PINT_smcb *smcb, job_status_s *js_p)
  693. {
  694.     int i, ret;
  695.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  696.     PINT_sm_msgpair_state *msg_p;
  697.  
  698.     PINT_msgpairarray_destroy(&sm_p->msgarray_op);
  699.  
  700.     /* don't need sizes */
  701.     if (!(sm_p->u.readdirplus.attrmask & PVFS_ATTR_META_ALL)
  702.         && !(sm_p->u.readdirplus.attrmask & PVFS_ATTR_DATA_SIZE)) {
  703.         js_p->error_code = NO_WORK;
  704.         return SM_ACTION_COMPLETE;
  705.     }
  706.    
  707.      /* ok, now we have all the data files. split it on a per-server basis */
  708.      if ((ret = list_of_data_servers(sm_p)) < 0) 
  709.      {
  710.          js_p->error_code = ret;
  711.          return SM_ACTION_COMPLETE;
  712.      }
  713.      if (sm_p->u.readdirplus.svr_count == 0)
  714.      {
  715.          /* no need to contact any server since there are no regular meta files */
  716.          js_p->error_code = NO_WORK;
  717.          return SM_ACTION_COMPLETE;
  718.      }
  719.  
  720.      ret = PINT_msgpairarray_init(&sm_p->msgarray_op, sm_p->u.readdirplus.svr_count);
  721.      if(ret != 0)
  722.      {
  723.          gossip_err("Failed to initialize %d msgpairs\n",
  724.                     sm_p->u.readdirplus.svr_count);
  725.          js_p->error_code = ret;
  726.          return SM_ACTION_COMPLETE;
  727.      }
  728.  
  729.      foreach_msgpair(&sm_p->msgarray_op, msg_p, i)
  730.      {
  731.         PINT_SERVREQ_LISTATTR_FILL(
  732.             msg_p->req,
  733.             *sm_p->cred_p,
  734.             sm_p->object_ref.fs_id,
  735.             PVFS_ATTR_DATA_SIZE,
  736.             sm_p->u.readdirplus.handle_count[i],
  737.             sm_p->u.readdirplus.handles[i],
  738.             sm_p->hints);
  739.         msg_p->fs_id = sm_p->object_ref.fs_id;
  740.         msg_p->handle = PVFS_HANDLE_NULL;
  741.         msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
  742.         msg_p->comp_fn = readdirplus_fetch_sizes_comp_fn;
  743.         msg_p->svr_addr = sm_p->u.readdirplus.server_addresses[i];
  744.  
  745.      }
  746.      /* immediate return. next state jumps to msgpairarray machine */
  747.      js_p->error_code = 0;
  748.  
  749.      PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
  750.      return SM_ACTION_COMPLETE;
  751. }
  752.  
  753. /* Phase 2 completion callback */
  754. static int readdirplus_fetch_sizes_comp_fn(void *v_p,
  755.                                struct PVFS_server_resp *resp_p,
  756.                                int index)
  757. {
  758.     PINT_smcb *smcb = v_p;
  759.     PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
  760.  
  761.     gossip_debug(GOSSIP_LISTATTR_DEBUG,
  762.                  "readdirplus_fetch_sizes_comp_fn called\n");
  763.     assert(resp_p->op == PVFS_SERV_LISTATTR);
  764.  
  765.     /* Mark all handles in this server range as having failed a stat */
  766.     if (sm_p->msgarray_op.msgarray[index].op_status != 0) {
  767.         int i, handle_index, aux_index;
  768.         for (i = 0; i < sm_p->u.readdirplus.handle_count[index]; i++) {
  769.             get_handle_index(sm_p->u.readdirplus.input_handle_array,
  770.                             sm_p->u.readdirplus.nhandles,
  771.                             sm_p->u.readdirplus.handles[index][i],
  772.                             &handle_index,
  773.                             &aux_index);
  774.             assert(handle_index >= 0 && aux_index >= 0);
  775.             sm_p->u.readdirplus.readdirplus_resp->stat_err_array[handle_index] = 
  776.                             sm_p->msgarray_op.msgarray[index].op_status;
  777.         }
  778.     }
  779.     else if (sm_p->msgarray_op.msgarray[index].op_status == 0)
  780.     {
  781.         /* fetch all errors from the servresp structure and copy the object attributes */
  782.         int i, handle_index, aux_index;
  783.  
  784.         /* make sure that we get back responses for all handles that we sent out */
  785.         assert(resp_p->u.listattr.nhandles == sm_p->u.readdirplus.handle_count[index]);
  786.         for (i = 0; i < sm_p->u.readdirplus.handle_count[index]; i++) 
  787.         {
  788.             get_handle_index(sm_p->u.readdirplus.input_handle_array,
  789.                             sm_p->u.readdirplus.nhandles,
  790.                             sm_p->u.readdirplus.handles[index][i],
  791.                             &handle_index,
  792.                             &aux_index);
  793.             /* Copy any errors */
  794.             sm_p->u.readdirplus.readdirplus_resp->stat_err_array[handle_index] =
  795.                             resp_p->u.listattr.error[i];
  796.             if (resp_p->u.listattr.error[i] == 0)
  797.             {
  798.                 /* if no errors, stash the object sizes */
  799.                 assert(resp_p->u.listattr.attr[i].objtype == PVFS_TYPE_DATAFILE);
  800.                 sm_p->u.readdirplus.size_array[handle_index][aux_index] = 
  801.                         resp_p->u.listattr.attr[i].u.data.size;
  802.             }
  803.         }
  804.     }
  805.     /* If this is the last server response, check all the status values
  806.      * and stash any error codes if any of them failed 
  807.      */
  808.     if (index == (sm_p->msgarray_op.count - 1))
  809.     {
  810.         return PINT_msgarray_status(&sm_p->msgarray_op);
  811.     }
  812.     return 0;
  813. }
  814.  
  815. static PINT_sm_action readdirplus_msg_failure(
  816.     struct PINT_smcb *smcb, job_status_s *js_p)
  817. {
  818.     gossip_debug(GOSSIP_CLIENT_DEBUG,
  819.                  "readdirplus state: readdirplus_msg_failure\n");
  820.     return SM_ACTION_COMPLETE;
  821. }
  822.  
  823. static PINT_sm_action readdirplus_cleanup(
  824.     struct PINT_smcb *smcb, job_status_s *js_p)
  825. {
  826.     int i;
  827.     PVFS_sysresp_readdirplus *readdirplus_resp;
  828.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  829.     gossip_debug(GOSSIP_CLIENT_DEBUG, "readdirplus state: cleanup\n");
  830.  
  831.     PINT_SM_GETATTR_STATE_CLEAR(sm_p->getattr);
  832.  
  833.     if (js_p->error_code == NO_WORK) {
  834.         js_p->error_code = 0;
  835.     }
  836.     sm_p->error_code = js_p->error_code;
  837.     gossip_debug(GOSSIP_READDIR_DEBUG, " final return code is %d\n",
  838.                  sm_p->error_code);
  839.  
  840.     readdirplus_resp = sm_p->u.readdirplus.readdirplus_resp;
  841.     assert(readdirplus_resp);
  842.     /* Walk through the object attributes and convert them into system attributes for those without errors */
  843.     for (i = 0; i < readdirplus_resp->pvfs_dirent_outcount; i++) 
  844.     {
  845.         if (readdirplus_resp->stat_err_array[i] == 0)
  846.         {
  847.             /* convert into sys attributes */
  848.             readdirplus_resp->attr_array[i].owner 
  849.                     = sm_p->u.readdirplus.obj_attr_array[i].owner;
  850.             readdirplus_resp->attr_array[i].group 
  851.                     = sm_p->u.readdirplus.obj_attr_array[i].group;
  852.             readdirplus_resp->attr_array[i].perms 
  853.                     = sm_p->u.readdirplus.obj_attr_array[i].perms;
  854.             readdirplus_resp->attr_array[i].atime 
  855.                     = sm_p->u.readdirplus.obj_attr_array[i].atime;
  856.             readdirplus_resp->attr_array[i].mtime 
  857.                     = sm_p->u.readdirplus.obj_attr_array[i].mtime;
  858.             readdirplus_resp->attr_array[i].ctime 
  859.                     = sm_p->u.readdirplus.obj_attr_array[i].ctime;
  860.             readdirplus_resp->attr_array[i].objtype 
  861.                     = sm_p->u.readdirplus.obj_attr_array[i].objtype;
  862.             readdirplus_resp->attr_array[i].mask
  863.                     = PVFS_util_object_to_sys_attr_mask
  864.                       (sm_p->u.readdirplus.obj_attr_array[i].mask);
  865.             if (readdirplus_resp->attr_array[i].objtype == PVFS_TYPE_METAFILE)
  866.             {
  867.               if (sm_p->u.readdirplus.attrmask & PVFS_ATTR_META_DIST )
  868.               {
  869.                   if(sm_p->u.readdirplus.obj_attr_array[i].mask 
  870.                       & PVFS_ATTR_META_UNSTUFFED)
  871.                   {
  872.                       PVFS_size (*logical_file_size)(void* params,
  873.                                                      uint32_t num_handles,
  874.                                                      PVFS_size *psizes) = NULL;
  875.                       /* compute the file size */
  876.                       assert(sm_p->u.readdirplus.size_array[i]);
  877.                       logical_file_size = 
  878.                         sm_p->u.readdirplus.obj_attr_array[i].u.meta.dist->methods->logical_file_size;
  879.                       assert(logical_file_size);
  880.                       assert(sm_p->u.readdirplus.obj_attr_array[i].u.meta.dist->params);
  881.  
  882.                       readdirplus_resp->attr_array[i].size = logical_file_size(
  883.                       sm_p->u.readdirplus.obj_attr_array[i].u.meta.dist->params,
  884.                       sm_p->u.readdirplus.obj_attr_array[i].u.meta.dfile_count,
  885.                       sm_p->u.readdirplus.size_array[i]);
  886.  
  887.                   }
  888.                   else
  889.                   {
  890.                      /* size for stuffed case */
  891.                      readdirplus_resp->attr_array[i].size = 
  892.                      sm_p->u.readdirplus.obj_attr_array[i].u.meta.stuffed_size;
  893.                   }
  894.                     readdirplus_resp->attr_array[i].mask |= PVFS_ATTR_SYS_SIZE;
  895.               }
  896.               if (sm_p->u.readdirplus.attrmask & PVFS_ATTR_META_DFILES) 
  897.               {
  898.                   readdirplus_resp->attr_array[i].dfile_count = 
  899.                       sm_p->u.readdirplus.obj_attr_array[i].u.meta.dfile_count;
  900.                   readdirplus_resp->attr_array[i].mask 
  901.                                                |= PVFS_ATTR_SYS_DFILE_COUNT;
  902.               }
  903.               if (sm_p->u.readdirplus.attrmask & PVFS_ATTR_META_MIRROR_DFILES)
  904.               {
  905.                readdirplus_resp->attr_array[i].mirror_copies_count =
  906.                sm_p->u.readdirplus.obj_attr_array[i].u.meta.mirror_copies_count;
  907.                readdirplus_resp->attr_array[i].mask 
  908.                            |= PVFS_ATTR_SYS_MIRROR_COPIES_COUNT;
  909.               }
  910.             }
  911.             else if (readdirplus_resp->attr_array[i].objtype ==
  912.                      PVFS_TYPE_DIRECTORY)
  913.             {
  914.                 readdirplus_resp->attr_array[i].dirent_count = 
  915.                     sm_p->u.readdirplus.obj_attr_array[i].u.dir.dirent_count;
  916.                 readdirplus_resp->attr_array[i].mask 
  917.                       |= PVFS_ATTR_SYS_DIRENT_COUNT;
  918.             }
  919.             else if (readdirplus_resp->attr_array[i].objtype == PVFS_TYPE_SYMLINK
  920.                     && sm_p->u.readdirplus.attrmask & PVFS_ATTR_SYMLNK_TARGET) 
  921.             {
  922.                 readdirplus_resp->attr_array[i].link_target =
  923.                     strdup(sm_p->u.readdirplus.obj_attr_array[i].u.sym.target_path);
  924.                 readdirplus_resp->attr_array[i].mask |= PVFS_ATTR_SYS_LNK_TARGET;
  925.             }
  926.             else 
  927.             {
  928.                 gossip_err("Invalid type %d in readdirplus\n", 
  929.                     readdirplus_resp->attr_array[i].objtype);
  930.             }
  931.         }
  932.     }
  933.     destroy_partition_handles(&sm_p->u.readdirplus.svr_count, 
  934.                        &sm_p->u.readdirplus.server_addresses,
  935.                        &sm_p->u.readdirplus.handle_count,
  936.                        &sm_p->u.readdirplus.handles);
  937.     for (i = 0; i < readdirplus_resp->pvfs_dirent_outcount; i++) 
  938.     {
  939.         if (sm_p->u.readdirplus.size_array[i])
  940.         {
  941.             free(sm_p->u.readdirplus.size_array[i]);
  942.             sm_p->u.readdirplus.size_array[i] = NULL;
  943.         }
  944.     }
  945.     free(sm_p->u.readdirplus.size_array);
  946.     sm_p->u.readdirplus.size_array = NULL;
  947.     free(sm_p->u.readdirplus.input_handle_array);
  948.     sm_p->u.readdirplus.input_handle_array = NULL;
  949.     for (i = 0; i < readdirplus_resp->pvfs_dirent_outcount; i++)
  950.     {
  951.         PINT_free_object_attr(&sm_p->u.readdirplus.obj_attr_array[i]);
  952.     }
  953.     free(sm_p->u.readdirplus.obj_attr_array);
  954.     sm_p->u.readdirplus.obj_attr_array = NULL;
  955.     PINT_msgpairarray_destroy(&sm_p->msgarray_op);
  956.     PINT_SET_OP_COMPLETE;
  957.     return SM_ACTION_TERMINATE;
  958. }
  959.  
  960. /*
  961.  * Local variables:
  962.  *  mode: c
  963.  *  c-indent-level: 4
  964.  *  c-basic-offset: 4
  965.  * End:
  966.  *
  967.  * vim: ft=c ts=8 sts=4 sw=4 expandtab
  968.  */
  969.