home *** CD-ROM | disk | FTP | other *** search
/ ftp.parl.clemson.edu / 2015-02-07.ftp.parl.clemson.edu.tar / ftp.parl.clemson.edu / pub / pvfs2 / orangefs-2.8.3-20110323.tar.gz / orangefs-2.8.3-20110323.tar / orangefs / src / client / sysint / sys-create.sm < prev    next >
Text File  |  2011-03-22  |  33KB  |  1,041 lines

  1. /* 
  2.  * Copyright ⌐ Acxiom Corporation, 2006
  3.  *
  4.  * See COPYING in top-level directory.
  5.  */
  6.  
  7. /** \file
  8.  *  \ingroup sysint
  9.  *
  10.  *  PVFS2 system interface routines for creating files.
  11.  */
  12.  
  13. #include <string.h>
  14. #include <assert.h>
  15.  
  16. #include "client-state-machine.h"
  17. #include "pvfs2-debug.h"
  18. #include "pvfs2-dist-simple-stripe.h"
  19. #include "job.h"
  20. #include "gossip.h"
  21. #include "str-utils.h"
  22. #include "pint-cached-config.h"
  23. #include "pint-distribution.h"
  24. #include "PINT-reqproto-encode.h"
  25. #include "pint-util.h"
  26. #include "pint-dist-utils.h"
  27. #include "ncache.h"
  28. #include "pvfs2-internal.h"
  29. #include "pvfs2-dist-varstrip.h"
  30.  
  31. extern job_context_id pint_client_sm_context;
  32.  
  33. enum
  34. {
  35.     CREATE_RETRY = 170
  36. };
  37.  
  38. /* completion function prototypes */
  39. static int create_comp_fn(
  40.     void *v_p, struct PVFS_server_resp *resp_p, int index);
  41. static int create_crdirent_comp_fn(
  42.     void *v_p, struct PVFS_server_resp *resp_p, int index);
  43. static int create_delete_handles_comp_fn(
  44.     void *v_p, struct PVFS_server_resp *resp_p, int index);
  45.  
  46. /* misc helper functions */
  47. static PINT_dist* get_default_distribution(PVFS_fs_id fs_id);
  48.  
  49. %%
  50.  
  51. machine pvfs2_client_create_sm
  52. {
  53.     state init
  54.     {
  55.         run create_init;
  56.         default => parent_getattr;
  57.     }
  58.  
  59.     state parent_getattr
  60.     {
  61.         jump pvfs2_client_getattr_sm;
  62.         success => parent_getattr_inspect;
  63.         default => cleanup;
  64.     }
  65.  
  66.     state parent_getattr_inspect
  67.     {
  68.         run create_parent_getattr_inspect;
  69.         success => create_setup_msgpair;
  70.         default => cleanup;
  71.     }
  72.  
  73.     state create_setup_msgpair
  74.     {
  75.         run create_create_setup_msgpair;
  76.         success => create_xfer_msgpair;
  77.         default => cleanup;
  78.     }
  79.  
  80.     state create_xfer_msgpair
  81.     {
  82.         jump pvfs2_msgpairarray_sm;
  83.         success => crdirent_setup_msgpair;
  84.         default => cleanup;
  85.     }
  86.  
  87.     state crdirent_setup_msgpair
  88.     {
  89.         run create_crdirent_setup_msgpair;
  90.         success => crdirent_xfer_msgpair;
  91.         default => crdirent_failure;
  92.     }
  93.  
  94.     state crdirent_xfer_msgpair
  95.     {
  96.         jump pvfs2_msgpairarray_sm;
  97.         success => cleanup;
  98.         default => crdirent_failure;
  99.     }
  100.  
  101.     state crdirent_failure
  102.     {
  103.         run create_crdirent_failure;
  104.         default => delete_handles_setup_msgpair_array;
  105.     }
  106.  
  107.     state delete_handles_setup_msgpair_array
  108.     {
  109.         run create_delete_handles_setup_msgpair_array;
  110.         success => delete_handles_xfer_msgpair_array;
  111.         default => cleanup;
  112.     }
  113.  
  114.     state delete_handles_xfer_msgpair_array
  115.     {
  116.         jump pvfs2_msgpairarray_sm;
  117.         default => cleanup;
  118.     }
  119.  
  120.     state cleanup
  121.     {
  122.         run create_cleanup;
  123.         CREATE_RETRY => init;
  124.         default => terminate;
  125.     }
  126. }
  127.  
  128. %%
  129.  
  130. /** Initiate creation of a file with a specified distribution.
  131.  */
  132. PVFS_error PVFS_isys_create(
  133.     char *object_name,
  134.     PVFS_object_ref parent_ref,
  135.     PVFS_sys_attr attr,
  136.     const PVFS_credentials *credentials,
  137.     PVFS_sys_dist *dist,
  138.     PVFS_sys_layout *layout,
  139.     PVFS_sysresp_create *resp,
  140.     PVFS_sys_op_id *op_id,
  141.     PVFS_hint hints,
  142.     void *user_ptr)
  143. {
  144.     PVFS_error ret = -PVFS_EINVAL;
  145.     PINT_smcb *smcb = NULL;
  146.     PINT_client_sm *sm_p = NULL;
  147.  
  148.     gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_isys_create entered\n");
  149.  
  150.     if ((parent_ref.handle == PVFS_HANDLE_NULL) ||
  151.         (parent_ref.fs_id == PVFS_FS_ID_NULL) ||
  152.         (object_name == NULL) || (resp == NULL))
  153.     {
  154.         gossip_err("invalid (NULL) required argument\n");
  155.         return ret;
  156.     }
  157.  
  158.     if ((attr.mask & PVFS_ATTR_SYS_ALL_SETABLE) != PVFS_ATTR_SYS_ALL_SETABLE)
  159.     {
  160.         gossip_lerr("PVFS_isys_create() failure: invalid attribute mask: %d, "
  161.                     "expected SYS_ALL_SETABLE (%d)\n",
  162.                     attr.mask, PVFS_ATTR_SYS_ALL_SETABLE);
  163.         return ret;
  164.     }
  165.  
  166.     if ((attr.mask & PVFS_ATTR_SYS_DFILE_COUNT) &&
  167.         ((attr.dfile_count < 1) ||
  168.          (attr.dfile_count > PVFS_REQ_LIMIT_DFILE_COUNT)))
  169.     {
  170.     gossip_err("Error: invalid number of datafiles (%d) specified "
  171.                    "in PVFS_sys_create().\n", (int)attr.dfile_count);
  172.     return ret;
  173.     }
  174.  
  175.     if ((strlen(object_name) + 1) > PVFS_REQ_LIMIT_SEGMENT_BYTES)
  176.     {
  177.         return -PVFS_ENAMETOOLONG;
  178.     }
  179.  
  180.     PINT_smcb_alloc(&smcb, PVFS_SYS_CREATE,
  181.              sizeof(struct PINT_client_sm),
  182.              client_op_state_get_machine,
  183.              client_state_machine_terminate,
  184.              pint_client_sm_context);
  185.     if (smcb == NULL)
  186.     {
  187.         return -PVFS_ENOMEM;
  188.     }
  189.     sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  190.  
  191.     PINT_init_msgarray_params(sm_p, parent_ref.fs_id);
  192.     PINT_init_sysint_credentials(sm_p->cred_p, credentials);
  193.     sm_p->u.create.object_name = object_name;
  194.     sm_p->u.create.create_resp = resp;
  195.     PINT_CONVERT_ATTR(&sm_p->u.create.attr, &attr, PVFS_ATTR_META_ALL);
  196.  
  197.     /* save the original attribute passed in. since create does it's own
  198.      * retries we need the original attribute available on retries */
  199.     PINT_copy_object_attr(&(sm_p->u.create.store_attr), &(sm_p->u.create.attr));
  200.  
  201.     sm_p->u.create.stored_error_code = 0;
  202.     sm_p->u.create.retry_count = 0;
  203.     PVFS_hint_copy(hints, &sm_p->hints);
  204.     PVFS_hint_add(&sm_p->hints, PVFS_HINT_HANDLE_NAME, sizeof(PVFS_handle), &parent_ref.handle);
  205.     sm_p->parent_ref = parent_ref;
  206.  
  207.     if(attr.mask & PVFS_ATTR_SYS_DFILE_COUNT)
  208.     {
  209.         sm_p->u.create.user_requested_num_data_files = attr.dfile_count;
  210.     }
  211.  
  212.     /* copy layout to sm struct */
  213.     if(layout)
  214.     {
  215.         /* make sure it is a supported layout */
  216.         switch(layout->algorithm)
  217.         {
  218.             /* these are valid */
  219.             case PVFS_SYS_LAYOUT_ROUND_ROBIN:
  220.             case PVFS_SYS_LAYOUT_RANDOM:
  221.             case PVFS_SYS_LAYOUT_LIST:
  222.                 break;
  223.             /* anything else is not */
  224.             default:
  225.                 return(-PVFS_EINVAL);
  226.         }
  227.  
  228.         sm_p->u.create.layout.algorithm = layout->algorithm;
  229.         if(layout->algorithm == PVFS_SYS_LAYOUT_LIST)
  230.         {
  231.             sm_p->u.create.layout.server_list.count = layout->server_list.count;
  232.             sm_p->u.create.layout.server_list.servers =
  233.                 malloc(layout->server_list.count * sizeof(PVFS_BMI_addr_t));
  234.             if(!sm_p->u.create.layout.server_list.servers)
  235.             {
  236.                 return -PVFS_ENOMEM;
  237.             }
  238.             memcpy(sm_p->u.create.layout.server_list.servers,
  239.                    layout->server_list.servers,
  240.                    layout->server_list.count * sizeof(PVFS_BMI_addr_t));
  241.         }
  242.     }
  243.     else
  244.     {
  245.         sm_p->u.create.layout.algorithm = PVFS_SYS_LAYOUT_ROUND_ROBIN;
  246.     }
  247.  
  248.     sm_p->object_ref = parent_ref;
  249.  
  250.     /* If the user specifies a distribution use that
  251.        else, use the default distribution */
  252.     if (dist)
  253.     {
  254.         if (!dist->name)
  255.         {
  256.             PINT_smcb_free(smcb);
  257.             return -PVFS_EINVAL;
  258.         }
  259.  
  260.         sm_p->u.create.dist = PINT_dist_create(dist->name);
  261.         if (!sm_p->u.create.dist)
  262.         {
  263.             PINT_smcb_free(smcb);
  264.             return -PVFS_ENOMEM;
  265.         }
  266.         sm_p->u.create.dist->params = dist->params;
  267.     }
  268.     else
  269.     {
  270.         /* Get the default distribution */
  271.         sm_p->u.create.dist =
  272.             get_default_distribution(sm_p->parent_ref.fs_id);
  273.         if (!sm_p->u.create.dist)
  274.         {
  275.             PINT_smcb_free(smcb);
  276.             return -PVFS_ENOMEM;
  277.         }
  278.     }
  279.  
  280.     gossip_debug(
  281.         GOSSIP_CLIENT_DEBUG, "Creating file %s under %llu, %d\n",
  282.         object_name, llu(parent_ref.handle), parent_ref.fs_id);
  283.  
  284.     return PINT_client_state_machine_post(
  285.         smcb,  op_id, user_ptr);
  286. }
  287.  
  288. /** Create a file with a specified distribution.
  289.  */
  290. PVFS_error PVFS_sys_create(
  291.     char *object_name,
  292.     PVFS_object_ref parent_ref,
  293.     PVFS_sys_attr attr,
  294.     const PVFS_credentials *credentials,
  295.     PVFS_sys_dist *dist,
  296.     PVFS_sysresp_create *resp,
  297.     PVFS_sys_layout *layout,
  298.     PVFS_hint hints)
  299. {
  300.     PVFS_error ret = -PVFS_EINVAL, error = 0;
  301.     PVFS_sys_op_id op_id;
  302.  
  303.     gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_sys_create entered\n");
  304.  
  305.     ret = PVFS_isys_create(object_name, parent_ref, attr, credentials,
  306.                            dist, layout, resp, &op_id, hints, NULL);
  307.     if (ret)
  308.     {
  309.         PVFS_perror_gossip("PVFS_isys_create call", ret);
  310.         error = ret;
  311.     }
  312.     else
  313.     {
  314.         ret = PVFS_sys_wait(op_id, "create", &error);
  315.         if (ret)
  316.         {
  317.             PVFS_perror_gossip("PVFS_sys_wait call", ret);
  318.             error = ret;
  319.         }
  320.     }
  321.  
  322.     PINT_sys_release(op_id);
  323.     return error;
  324. }
  325.  
  326. /****************************************************************/
  327.  
  328. static PINT_sm_action create_init(
  329.         struct PINT_smcb *smcb, job_status_s *js_p)
  330. {
  331.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  332.     job_id_t tmp_id;
  333.  
  334.  
  335.     assert((js_p->error_code == 0) ||
  336.            (js_p->error_code == CREATE_RETRY));
  337.  
  338.     PINT_SM_GETATTR_STATE_FILL(
  339.         sm_p->getattr,
  340.         sm_p->object_ref,
  341.         PVFS_ATTR_COMMON_ALL|PVFS_ATTR_DIR_HINT, 
  342.         PVFS_TYPE_DIRECTORY,
  343.         0);
  344.  
  345.     if (js_p->error_code == CREATE_RETRY)
  346.     {
  347.         js_p->error_code = 0;
  348.  
  349.         return job_req_sched_post_timer(
  350.             sm_p->msgarray_op.params.retry_delay, smcb, 0, js_p, &tmp_id,
  351.             pint_client_sm_context);
  352.     }
  353.  
  354.  
  355.    return SM_ACTION_COMPLETE;
  356. }
  357.  
  358. static int create_comp_fn(void *v_p,
  359.                                   struct PVFS_server_resp *resp_p,
  360.                                   int index)
  361. {
  362.     PINT_smcb *smcb = v_p;
  363.     PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
  364.  
  365.     gossip_debug(GOSSIP_CLIENT_DEBUG, "create_create_comp_fn\n");
  366.  
  367.     assert(resp_p->op == PVFS_SERV_CREATE);
  368.  
  369.     if (resp_p->status != 0)
  370.     {
  371.         return resp_p->status;
  372.     }
  373.  
  374.     /* otherwise, just stash the newly created meta handle */
  375.     sm_p->u.create.metafile_handle =
  376.         resp_p->u.create.metafile_handle;
  377.     sm_p->u.create.datafile_count = resp_p->u.create.datafile_count;
  378.     sm_p->u.create.datafile_handles = malloc(
  379.         sizeof(*sm_p->u.create.datafile_handles) *
  380.         sm_p->u.create.datafile_count);
  381.     if(!sm_p->u.create.datafile_handles)
  382.     {
  383.         return -PVFS_ENOMEM;
  384.     }
  385.     memcpy(sm_p->u.create.datafile_handles,
  386.            resp_p->u.create.datafile_handles,
  387.            (sizeof(*sm_p->u.create.datafile_handles) *
  388.            resp_p->u.create.datafile_count));
  389.     sm_p->u.create.stuffed = resp_p->u.create.stuffed;
  390.  
  391.     gossip_debug(
  392.         GOSSIP_CLIENT_DEBUG, "*** Got newly created handle %llu\n",
  393.         llu(sm_p->u.create.metafile_handle));
  394.  
  395.     return 0;
  396. }
  397.  
  398. static int create_crdirent_comp_fn(void *v_p,
  399.                                    struct PVFS_server_resp *resp_p,
  400.                                    int index)
  401. {
  402.     gossip_debug(GOSSIP_CLIENT_DEBUG, "create_crdirent_comp_fn\n");
  403.  
  404.     assert(resp_p->op == PVFS_SERV_CRDIRENT);
  405.     return resp_p->status;
  406. }
  407.  
  408. static PINT_sm_action create_create_setup_msgpair(
  409.         struct PINT_smcb *smcb, job_status_s *js_p)
  410. {
  411.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  412.     int ret = -PVFS_EINVAL;
  413.     PVFS_handle_extent_array meta_handle_extent_array;
  414.     PINT_sm_msgpair_state *msg_p = NULL;
  415.     int server_type;
  416.  
  417.     gossip_debug(GOSSIP_CLIENT_DEBUG, "create state: "
  418.                  "dspace_create_setup_msgpair\n");
  419.  
  420.     js_p->error_code = 0;
  421.  
  422.     gossip_debug(GOSSIP_CLIENT_DEBUG," create: posting create req\n");
  423.  
  424.     /* reset the attributes to what got passed in to the sysint call. the retry
  425.      * path comes through here so we'll want to reset it after each try. 
  426.      * force the mask to all meta attributes. */
  427.     if( sm_p->u.create.retry_count > 0 )
  428.     {
  429.         PINT_copy_object_attr(&(sm_p->u.create.attr), 
  430.                               &(sm_p->u.create.store_attr));
  431.         sm_p->u.create.attr.mask |= PVFS_ATTR_META_ALL;
  432.     }
  433.  
  434.     PINT_msgpair_init(&sm_p->msgarray_op);
  435.     msg_p = &sm_p->msgarray_op.msgpair;
  436.  
  437.     ret = PINT_cached_config_get_next_meta(
  438.         sm_p->object_ref.fs_id, &msg_p->svr_addr, &meta_handle_extent_array);
  439.     if(ret != 0)
  440.     {
  441.         gossip_err("Failed to map meta server address\n");
  442.         js_p->error_code = ret;
  443.         return SM_ACTION_COMPLETE;
  444.     }
  445.  
  446.     /* resolve and print selected server only if gossip debugging enabled */
  447.     if(gossip_debug_enabled(GOSSIP_CLIENT_DEBUG))
  448.     {
  449.         gossip_debug(GOSSIP_CLIENT_DEBUG, 
  450.             "PVFS_isys_create() selected meta server: %s\n", 
  451.             PINT_cached_config_map_addr(sm_p->object_ref.fs_id,
  452.                 msg_p->svr_addr,
  453.                 &server_type));
  454.     }
  455.  
  456.     PINT_SERVREQ_CREATE_FILL(
  457.         msg_p->req,
  458.         *sm_p->cred_p,
  459.         sm_p->object_ref.fs_id,
  460.         sm_p->u.create.attr,
  461.         sm_p->u.create.num_data_files,
  462.         sm_p->u.create.layout,
  463.         sm_p->hints);
  464.  
  465.     msg_p->fs_id = sm_p->object_ref.fs_id;
  466.     msg_p->handle = meta_handle_extent_array.extent_array[0].first;
  467.     msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
  468.     msg_p->comp_fn = create_comp_fn;
  469.     msg_p->req.u.create.attr.u.meta.dfile_count = 0;
  470.     msg_p->req.u.create.attr.u.meta.dist =
  471.         sm_p->u.create.dist;
  472.     msg_p->req.u.create.attr.u.meta.dist_size =
  473.         PINT_DIST_PACK_SIZE(sm_p->u.create.dist);
  474.  
  475.     PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
  476.     return SM_ACTION_COMPLETE;
  477. }
  478.  
  479. static PINT_sm_action create_crdirent_setup_msgpair(
  480.         struct PINT_smcb *smcb, job_status_s *js_p)
  481. {
  482.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  483.     int ret = -1;
  484.     PINT_sm_msgpair_state *msg_p = NULL;
  485.  
  486.     gossip_debug(GOSSIP_CLIENT_DEBUG,
  487.                  "create state: crdirent_setup_msgpair\n");
  488.  
  489.     js_p->error_code = 0;
  490.  
  491.     gossip_debug(GOSSIP_CLIENT_DEBUG,
  492.                  "create: %s: posting crdirent req: parent handle: %llu, "
  493.                  "name: %s, handle: %llu\n",
  494.                  __func__,
  495.                  llu(sm_p->object_ref.handle), sm_p->u.create.object_name,
  496.                  llu(sm_p->u.create.metafile_handle));
  497.  
  498.     PINT_msgpair_init(&sm_p->msgarray_op);
  499.     msg_p = &sm_p->msgarray_op.msgpair;
  500.  
  501.     PINT_SERVREQ_CRDIRENT_FILL(
  502.         msg_p->req,
  503.         *sm_p->cred_p,
  504.         sm_p->u.create.object_name,
  505.         sm_p->u.create.metafile_handle,
  506.         sm_p->object_ref.handle,
  507.         sm_p->object_ref.fs_id,
  508.         sm_p->hints);
  509.  
  510.     msg_p->fs_id = sm_p->object_ref.fs_id;
  511.     msg_p->handle = sm_p->object_ref.handle;
  512.     msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
  513.     msg_p->comp_fn = create_crdirent_comp_fn;
  514.  
  515.     ret = PINT_cached_config_map_to_server(
  516.         &msg_p->svr_addr, sm_p->object_ref.handle,
  517.         sm_p->object_ref.fs_id);
  518.  
  519.     if (ret)
  520.     {
  521.         gossip_err("Failed to map meta server address\n");
  522.         js_p->error_code = ret;
  523.     }
  524.  
  525.     PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
  526.     return SM_ACTION_COMPLETE;
  527. }
  528.  
  529. static PINT_sm_action create_crdirent_failure(
  530.         struct PINT_smcb *smcb, job_status_s *js_p)
  531. {
  532.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  533.     gossip_debug(GOSSIP_CLIENT_DEBUG, "create state: crdirent_failure\n");
  534.  
  535.     sm_p->u.create.stored_error_code = js_p->error_code;
  536.  
  537.     if (sm_p->u.create.stored_error_code == -PVFS_EEXIST)
  538.     {
  539.         gossip_debug(GOSSIP_CLIENT_DEBUG, "crdirent failed: "
  540.                      "dirent already exists!\n");
  541.     }
  542.     return SM_ACTION_COMPLETE;
  543. }
  544.  
  545. static PINT_sm_action create_cleanup(
  546.         struct PINT_smcb *smcb, job_status_s *js_p)
  547. {
  548.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  549.     PVFS_object_ref metafile_ref;
  550.     PVFS_size tmp_size = 0;
  551.     int ret;
  552.  
  553.     gossip_debug(GOSSIP_CLIENT_DEBUG, "create state: cleanup\n");
  554.  
  555.     PINT_free_object_attr(&sm_p->u.create.attr);
  556.  
  557.     PINT_SM_GETATTR_STATE_CLEAR(sm_p->getattr);
  558.  
  559.     sm_p->error_code = (sm_p->u.create.stored_error_code ?
  560.                         sm_p->u.create.stored_error_code :
  561.                         js_p->error_code);
  562.  
  563.     memset(&metafile_ref, 0, sizeof(metafile_ref));
  564.  
  565.     if (sm_p->error_code == 0)
  566.     {
  567.         metafile_ref.handle = sm_p->u.create.metafile_handle;
  568.         metafile_ref.fs_id = sm_p->object_ref.fs_id;
  569.  
  570.         /* fill in outgoing response fields */
  571.         sm_p->u.create.create_resp->ref = metafile_ref;
  572.  
  573.         /* insert newly created metafile into the ncache */
  574.         PINT_ncache_update((const char*) sm_p->u.create.object_name, 
  575.                            (const PVFS_object_ref*) &metafile_ref, 
  576.                            (const PVFS_object_ref*) &(sm_p->object_ref));
  577.  
  578.         sm_p->u.create.attr.mask |= PVFS_ATTR_META_DFILES;
  579.         sm_p->u.create.attr.u.meta.dfile_array =
  580.             sm_p->u.create.datafile_handles;
  581.         sm_p->u.create.attr.u.meta.dfile_count =
  582.             sm_p->u.create.datafile_count;
  583.  
  584.         if(sm_p->u.create.stuffed)
  585.         {
  586.             gossip_debug(GOSSIP_CLIENT_DEBUG, "created stuffed file\n");
  587.             sm_p->u.create.attr.u.meta.stuffed_size = 0;
  588.         }
  589.         else
  590.         {
  591.             gossip_debug(GOSSIP_CLIENT_DEBUG, "created un-stuffed file\n");
  592.             sm_p->u.create.attr.mask |= PVFS_ATTR_META_UNSTUFFED;
  593.         }
  594.  
  595.         if(sm_p->u.create.dist)
  596.         {
  597.             sm_p->u.create.attr.u.meta.dist = sm_p->u.create.dist;
  598.             sm_p->u.create.attr.u.meta.dist_size = PINT_DIST_PACK_SIZE(sm_p->u.create.dist);
  599.             sm_p->u.create.attr.mask |= PVFS_ATTR_META_DIST;
  600.         }
  601.  
  602.         /* we only insert a cache entry if the entire create succeeds,
  603.          * set size to 0 
  604.          */ 
  605.         /* Also, make sure to clear time masks.  The server is responsible
  606.          * for setting that.
  607.          */
  608.         sm_p->u.create.attr.mask &= (~(PVFS_ATTR_COMMON_MTIME));
  609.         sm_p->u.create.attr.mask &= (~(PVFS_ATTR_COMMON_CTIME));
  610.         sm_p->u.create.attr.mask &= (~(PVFS_ATTR_COMMON_ATIME));
  611.         ret = PINT_acache_update(metafile_ref,
  612.                                  &sm_p->u.create.attr,
  613.                                  &tmp_size);
  614.         if(ret < 0)
  615.         {
  616.             js_p->error_code = ret;
  617.         }
  618.     }
  619.     else if ((PVFS_ERROR_CLASS(-sm_p->error_code) == PVFS_ERROR_BMI) &&
  620.              (sm_p->u.create.retry_count < sm_p->msgarray_op.params.retry_limit))
  621.     {
  622.         sm_p->u.create.stored_error_code = 0;
  623.         sm_p->u.create.retry_count++;
  624.  
  625.         gossip_debug(GOSSIP_CLIENT_DEBUG, "Retrying create operation "
  626.                      "(attempt number %d)\n", sm_p->u.create.retry_count);
  627.  
  628.         js_p->error_code = CREATE_RETRY;
  629.         return SM_ACTION_COMPLETE;
  630.     }
  631.  
  632.     if(sm_p->u.create.layout.algorithm == PVFS_SYS_LAYOUT_LIST)
  633.     {
  634.         free(sm_p->u.create.layout.server_list.servers);
  635.         sm_p->u.create.layout.server_list.servers = NULL;
  636.     }
  637.  
  638.     if(sm_p->u.create.dist)
  639.     {
  640.         PINT_dist_free(sm_p->u.create.dist);
  641.         sm_p->u.create.dist = NULL;
  642.     }
  643.  
  644.     if(sm_p->u.create.datafile_handles)
  645.     {
  646.         free(sm_p->u.create.datafile_handles);
  647.         sm_p->u.create.datafile_handles = NULL;
  648.     }
  649.  
  650.     PINT_msgpairarray_destroy(&sm_p->msgarray_op);
  651.  
  652.     PINT_SET_OP_COMPLETE;
  653.     return SM_ACTION_TERMINATE;
  654. }
  655.  
  656. /** looks at the attributes of the parent directory and decides if it impacts
  657.  *  the file creation in any way
  658.  */
  659. static PINT_sm_action create_parent_getattr_inspect(
  660.         struct PINT_smcb *smcb, job_status_s *js_p)
  661. {
  662.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  663.     PVFS_object_attr *attr = NULL;
  664.     PINT_dist *current_dist; 
  665.     int ret = 0;
  666.     int num_dfiles_requested = 0;
  667.  
  668.     gossip_debug(GOSSIP_CLIENT_DEBUG, "create state: parent_getattr_inspect\n");
  669.  
  670.     attr = &sm_p->getattr.attr;
  671.     assert(attr);
  672.  
  673.     gossip_debug(GOSSIP_CLIENT_DEBUG, "parent owner: %d, group: %d, perms: %d\n",
  674.         (int)attr->owner, (int)attr->group, (int)attr->perms);
  675.  
  676.     /* do we have a setgid bit? */
  677.     if(attr->perms & PVFS_G_SGID)
  678.     {
  679.         gossip_debug(GOSSIP_CLIENT_DEBUG, "parent has setgid bit set.\n");
  680.         gossip_debug(GOSSIP_CLIENT_DEBUG, " - modifying requested attr "
  681.                                           "for new file.\n");
  682.         sm_p->u.create.attr.group = attr->group;
  683.         /* note that permission checking is left to server even in this case */
  684.     }
  685.     gossip_debug(GOSSIP_CLIENT_DEBUG, "create_parent_getattr: [%p] "
  686.         "dfile_count     = %d "
  687.         "dist_name_len   = %d "
  688.         "dist_params_len = %d\n",
  689.         attr,
  690.         attr->u.dir.hint.dfile_count,
  691.         attr->u.dir.hint.dist_name_len,
  692.         attr->u.dir.hint.dist_params_len);
  693.  
  694.     current_dist = sm_p->u.create.dist;
  695.     /* We have an overriding distribution name for this directory.. honor that */
  696.     if (attr->u.dir.hint.dist_name_len > 0)
  697.     {
  698.         /* switch it only if it is different! */
  699.         if (strcmp(attr->u.dir.hint.dist_name, current_dist->dist_name))
  700.         {
  701.             PINT_dist *new_dist = NULL;
  702.             new_dist = PINT_dist_create(attr->u.dir.hint.dist_name);
  703.             if (new_dist)
  704.             {
  705.                 gossip_debug(GOSSIP_CLIENT_DEBUG,
  706.                              "Overridding distribution name to %s instead of %s\n",
  707.                     attr->u.dir.hint.dist_name,
  708.                     current_dist->dist_name);
  709.                 PINT_dist_free(current_dist);
  710.                 sm_p->u.create.dist = new_dist;
  711.                 current_dist = new_dist;
  712.             }
  713.             else
  714.             {
  715.                 gossip_debug(
  716.                     GOSSIP_CLIENT_DEBUG,
  717.                     "Could not override distribution name with %s instead of %s\n",
  718.                     attr->u.dir.hint.dist_name,
  719.                     current_dist->dist_name);
  720.             }
  721.         }
  722.         else {
  723.             gossip_debug(
  724.                 GOSSIP_CLIENT_DEBUG,
  725.                 "retaining current distribution name %s\n",
  726.                 current_dist->dist_name);
  727.         }
  728.     }
  729.  
  730.     /* okay, we might need to override some dist params as well */
  731.     if (attr->u.dir.hint.dist_params_len > 0)
  732.     {
  733.         /* We have a series of comma separated key:val strings */
  734.         char **key, **val;
  735.         int64_t tmp_val;
  736.         int nparams = 0;
  737.  
  738.         if (strncmp(current_dist->dist_name, 
  739.             PVFS_DIST_VARSTRIP_NAME, 
  740.             PVFS_DIST_VARSTRIP_NAME_SIZE) == 0) 
  741.         {
  742.             /* varstrip parameters are a special case; we can't use the
  743.              * normal split_keyvals function because the : separater is also
  744.              * used within paramers that only varstrip can parse
  745.              */ 
  746.  
  747.             /* look for a "strips:" prefix */
  748.             if(strstr(attr->u.dir.hint.dist_params, "strips:") 
  749.                 != attr->u.dir.hint.dist_params)
  750.             {
  751.                 gossip_err("Error: failed to parse directory hints for varstrip distribution.\n");
  752.                 js_p->error_code = -PVFS_EINVAL;
  753.                 return SM_ACTION_COMPLETE;
  754.             }
  755.             if(current_dist->methods->set_param(current_dist->dist_name,
  756.                 current_dist->params,
  757.                 "strips",
  758.                 &attr->u.dir.hint.dist_params[strlen("strips:")]))
  759.             {
  760.                 gossip_err("Error: failed to set directory hints for varstrip distribution.\n");
  761.                 js_p->error_code = -PVFS_EINVAL;
  762.                 return SM_ACTION_COMPLETE;
  763.             }
  764.         }
  765.         /* ignore parse errors! */
  766.         /* TODO: why should we ignore parsing errors? */
  767.         else if (PINT_split_keyvals(attr->u.dir.hint.dist_params,
  768.             &nparams, &key, &val) == 0)
  769.         {
  770.             int i;
  771.             for (i = 0; i < nparams; i++)
  772.             {
  773.                 gossip_debug(GOSSIP_CLIENT_DEBUG,
  774.                              "distribution parameter %s, value %s\n",
  775.                              key[i], val[i]);
  776.                 /* NOTE: just as in server-config.c when parsing "Param" and
  777.                  * "Value" fields, we will assume that all values are 64 bit
  778.                  * integers.  The only difference here is that we scan
  779.                  * directly into a 64 bit integer, rather than converting
  780.                  * from the int format that dotconf supports.
  781.                  */
  782.                 ret = sscanf(val[i], SCANF_lld, &tmp_val);
  783.                 if(ret != 1)
  784.                 {
  785.                     gossip_err(
  786.                         "Error: unsupported type for distribution parameter %s, "
  787.                         "value %s found in directory hints.\n", 
  788.                         key[i], val[i]);
  789.                     gossip_err("Error: continuing anyway.\n");
  790.                 }
  791.                 else
  792.                 {
  793.                     if(current_dist->methods->set_param(current_dist->dist_name,
  794.                         current_dist->params,
  795.                         key[i],
  796.                         &tmp_val))
  797.                     {
  798.  
  799.                         gossip_err(
  800.                             "Error: could not override hinted distribution "
  801.                             "parameter %s, value %s found in directory hints\n",
  802.                             key[i], val[i]);
  803.                     }
  804.                  }
  805.                  free(key[i]);
  806.                  free(val[i]);
  807.             }
  808.             free(key);
  809.             free(val);
  810.         }
  811.     }
  812.  
  813.     /* priority for determining user's preference for number of data files:
  814.      * 1) count specified in attr's passed into sys_create
  815.      * 2) directory hints
  816.      * 3) mount options
  817.      * 4) system default
  818.      * All of the above can be overridden by the distribution itself.
  819.      */
  820.  
  821.     if(sm_p->u.create.user_requested_num_data_files > 0)
  822.     {
  823.         /* specified by sys_create caller */
  824.         num_dfiles_requested = sm_p->u.create.user_requested_num_data_files;     
  825.     }
  826.     else if(attr->u.dir.hint.dfile_count > 0)
  827.     {
  828.         num_dfiles_requested = attr->u.dir.hint.dfile_count;
  829.     }
  830.     else
  831.     {
  832.         /* Check the mount options */
  833.         int rc;
  834.         struct PVFS_sys_mntent mntent;
  835.  
  836.         rc = PVFS_util_get_mntent_copy(sm_p->object_ref.fs_id, &mntent);
  837.         if (0 == rc)
  838.         {
  839.             num_dfiles_requested = mntent.default_num_dfiles;
  840.             PVFS_util_free_mntent(&mntent);
  841.         }
  842.     }
  843.  
  844.     /* Determine the number of dfiles.   Pass in the number requested by the
  845.      * client, but will be overridden by default configuration and/or
  846.      * distribution if necessary 
  847.      */
  848.     ret = PINT_cached_config_get_num_dfiles(sm_p->object_ref.fs_id,
  849.                                             sm_p->u.create.dist,
  850.                                             num_dfiles_requested,
  851.                                             &sm_p->u.create.num_data_files);
  852.     if(ret < 0)
  853.     {
  854.         gossip_err("Error: failed to get number of data servers\n");
  855.         js_p->error_code = ret;
  856.         return SM_ACTION_COMPLETE;
  857.     }
  858.  
  859.     gossip_debug(GOSSIP_CLIENT_DEBUG, "Setting number of datafiles to %d [requested %d]\n", 
  860.         sm_p->u.create.num_data_files, num_dfiles_requested);
  861.  
  862.     return SM_ACTION_COMPLETE;
  863. }
  864.  
  865. /**
  866.  * Returns the default distribution, or NULL if the distribution could not
  867.  * be created.  The default distribution is read from the server
  868.  * configuration if possible.  If the server config does not specify a
  869.  * default distribution, simple_stripe will be used.
  870.  */
  871. static PINT_dist* get_default_distribution(PVFS_fs_id fs_id)
  872. {
  873.     server_configuration_s* server_config = NULL;
  874.     PINT_dist* dist = NULL;
  875.  
  876.     /* Retrieve the server configuration (with mutex) */
  877.     server_config = PINT_get_server_config_struct(fs_id);
  878.  
  879.     /* If a default dist is specified in the config, use that
  880.        else just create a simple_stripe distribution */
  881.     if (NULL != server_config &&
  882.         NULL != server_config->default_dist_config.name)
  883.     {
  884.         dist = PINT_dist_create(server_config->default_dist_config.name);
  885.         
  886.         if (dist)
  887.         {
  888.             PINT_llist_p iter = server_config->default_dist_config.param_list;
  889.  
  890.             /* Set supplied the distribution parameters */
  891.             while (iter)
  892.             {
  893.                 int rc;
  894.                 distribution_param_configuration* param =PINT_llist_head(iter);
  895.  
  896.                 /* If we are at the list end, break
  897.                    else, set the distribution parameter to the given value */
  898.                 if (NULL == param)
  899.                 {
  900.                     break;
  901.                 }
  902.                 else
  903.                 {
  904.                     rc = dist->methods->set_param(dist->dist_name,
  905.                                                   dist->params,
  906.                                                   param->name,
  907.                                                   ¶m->value);
  908.  
  909.                     if (0 != rc)
  910.                     {
  911.                         gossip_err("Error setting distribution parameter\n"
  912.                                    "  dist: %s\n"
  913.                                    "  param name: %s\n"
  914.                                    "  param value: %lld\n",
  915.                                    dist->dist_name, param->name,
  916.                                    lld(param->value));
  917.                     }
  918.                 }
  919.                 iter = PINT_llist_next(iter);
  920.             }                
  921.         }
  922.         else
  923.         {
  924.             gossip_err("Error creating default distribution: %s\n",
  925.                        server_config->default_dist_config.name);
  926.         }
  927.     }
  928.     else
  929.     {
  930.         dist = PINT_dist_create(PVFS_DIST_SIMPLE_STRIPE_NAME);
  931.     }
  932.  
  933.     /* Release the server config mutex */
  934.     PINT_put_server_config_struct(server_config);
  935.  
  936.     return dist;
  937. }
  938.  
  939. static int create_delete_handles_comp_fn(void *v_p,
  940.                                          struct PVFS_server_resp *resp_p,
  941.                                          int index)
  942. {
  943.     gossip_debug(GOSSIP_CLIENT_DEBUG, "create_delete_handles_comp_fn\n");
  944.  
  945.     assert(resp_p->op == PVFS_SERV_REMOVE);
  946.  
  947.     if (resp_p->status != 0)
  948.     {
  949.         gossip_debug(GOSSIP_CLIENT_DEBUG,
  950.                      "Failed to remove handle number %d\n", index);
  951.     }
  952.     return resp_p->status;
  953. }
  954.  
  955. /* delete the newly created meta and data handles */
  956. static PINT_sm_action create_delete_handles_setup_msgpair_array(
  957.         struct PINT_smcb *smcb, job_status_s *js_p)
  958. {
  959.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  960.     int ret = -PVFS_EINVAL, i = 0;
  961.     PINT_sm_msgpair_state *msg_p = NULL;
  962.  
  963.     gossip_debug(GOSSIP_CLIENT_DEBUG, "create state: "
  964.                  "delete_handles_setup_msgpair_array\n");
  965.  
  966.     js_p->error_code = 0;
  967.  
  968.     ret = PINT_msgpairarray_init(&sm_p->msgarray_op, (sm_p->u.create.datafile_count+1));
  969.     if(ret != 0)
  970.     {
  971.         gossip_err("Failed to initialize %d msgpairs\n", (sm_p->u.create.datafile_count+1));
  972.         js_p->error_code = ret;
  973.         return(SM_ACTION_COMPLETE);
  974.     }
  975.  
  976.     /*
  977.       for the metafile and each datafile, prepare to post a remove
  978.       send/recv pair
  979.     */
  980.     foreach_msgpair(&sm_p->msgarray_op, msg_p, i)
  981.     {
  982.         gossip_debug(GOSSIP_CLIENT_DEBUG,
  983.                      "create: posting data file remove req %d\n",i);
  984.  
  985.         /* arbitrarily handle deletion of the metafile last */
  986.         if (i == sm_p->u.create.datafile_count)
  987.         {
  988.             PINT_SERVREQ_REMOVE_FILL(
  989.                 msg_p->req,
  990.                 *sm_p->cred_p,
  991.                 sm_p->object_ref.fs_id,
  992.                 sm_p->u.create.metafile_handle,
  993.                 sm_p->hints);
  994.  
  995.             msg_p->fs_id = sm_p->object_ref.fs_id;
  996.             msg_p->handle = sm_p->u.create.metafile_handle;
  997.             msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
  998.             msg_p->comp_fn = create_delete_handles_comp_fn;
  999.  
  1000.             gossip_debug(GOSSIP_CLIENT_DEBUG, " Preparing to remove "
  1001.                          "metafile handle %llu\n", llu(msg_p->handle));
  1002.         }
  1003.         else
  1004.         {
  1005.             PINT_SERVREQ_REMOVE_FILL(
  1006.                 msg_p->req,
  1007.                 *sm_p->cred_p,
  1008.                 sm_p->object_ref.fs_id,
  1009.                 sm_p->u.create.datafile_handles[i],
  1010.                 sm_p->hints);
  1011.  
  1012.             msg_p->fs_id = sm_p->object_ref.fs_id;
  1013.             msg_p->handle = sm_p->u.create.datafile_handles[i];
  1014.             msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
  1015.             msg_p->comp_fn = create_delete_handles_comp_fn;
  1016.  
  1017.             gossip_debug(GOSSIP_CLIENT_DEBUG, " Preparing to remove "
  1018.                          "datafile handle %llu\n", llu(msg_p->handle));
  1019.         }
  1020.     }
  1021.     ret = PINT_serv_msgpairarray_resolve_addrs(&sm_p->msgarray_op);
  1022.     if(ret)
  1023.     {
  1024.         gossip_err("Error: failed to resolve server addresses.\n");
  1025.         js_p->error_code = ret;
  1026.     }
  1027.  
  1028.     PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
  1029.     return SM_ACTION_COMPLETE;
  1030. }
  1031.  
  1032. /*
  1033.  * Local variables:
  1034.  *  mode: c
  1035.  *  c-indent-level: 4
  1036.  *  c-basic-offset: 4
  1037.  * End:
  1038.  *
  1039.  * vim: ft=c ts=8 sts=4 sw=4 expandtab
  1040.  */
  1041.