home *** CD-ROM | disk | FTP | other *** search
/ ftp.parl.clemson.edu / 2015-02-07.ftp.parl.clemson.edu.tar / ftp.parl.clemson.edu / pub / pvfs2 / orangefs-2.8.3-20110323.tar.gz / orangefs-2.8.3-20110323.tar / orangefs / src / server / mirror.sm < prev    next >
Text File  |  2009-04-30  |  39KB  |  1,045 lines

  1. /* 
  2.  * (C) 2001 Clemson University and The University of Chicago 
  3.  *
  4.  * See COPYING in top-level directory.
  5.  */
  6.  
  7. #include <string.h>
  8. #include <assert.h>
  9.  
  10. #include "server-config.h"
  11. #include "pvfs2-server.h"
  12. #include "pvfs2-attr.h"
  13. #include "pvfs2-internal.h"
  14. #include "pvfs2-util.h"
  15. #include "pint-util.h"
  16. #include "pint-eattr.h"
  17. #include "pint-cached-config.h"
  18. #include "client-state-machine.h"
  19.  
  20. extern job_context_id pint_client_sm_context;
  21.  
  22. #define WRITE_ACK_RCV 0
  23. #define SRC_FLOW_POST 1
  24. #define NUM_OF_PHASES 2
  25.  
  26. #define PHASE(__tag) (__tag%NUM_OF_PHASES)
  27. #define DST(__tag) (__tag/NUM_OF_PHASES)
  28. #define TAG(__phase,__dst) ( (NUM_OF_PHASES * __dst) + __phase )
  29.  
  30.  
  31.  
  32. enum
  33. {
  34.     NO_DATA_TO_COPY = 100,
  35.     COMM_DONE       = 400,
  36. };
  37.  
  38. int write_comp_fn(void *v_p, struct PVFS_server_resp *resp_p, int i);
  39.  
  40.  
  41. %%
  42. machine pvfs2_mirror_sm
  43. {
  44.    state prelude
  45.     {
  46.         jump pvfs2_prelude_sm;
  47.         success => inspect_inputs;
  48.         default => final_response;
  49.     }
  50.  
  51.    state inspect_inputs
  52.     {
  53.          run inspect_inputs;
  54.          success => mirror_do_work;
  55.          default => final_response;
  56.     }
  57.  
  58.    state mirror_do_work
  59.     {
  60.          jump pvfs2_mirror_work_sm;
  61.          default => final_response;
  62.     }
  63.  
  64.    state final_response
  65.     {
  66.        jump pvfs2_final_response_sm;
  67.        default => mirror_cleanup;
  68.     }
  69.  
  70.    state mirror_cleanup
  71.     {
  72.        run mirror_cleanup;
  73.        default => terminate;
  74.     } 
  75.  
  76. } /*end machine pvfs2_mirror_sm*/
  77.  
  78. %%
  79.  
  80. %%
  81. nested machine pvfs2_mirror_work_sm
  82. {
  83.    state initialize_structures
  84.     {
  85.         run initialize_structures;
  86.         NO_DATA_TO_COPY => cleanup_mirror_work;
  87.         success => setup_write_request;
  88.         default => cleanup_mirror_work;
  89.     } 
  90.  
  91.    state setup_write_request
  92.     {
  93.         run setup_write_request;
  94.         success => call_msgpairarray;
  95.         default => cleanup_mirror_work;
  96.     }
  97.  
  98.    state call_msgpairarray
  99.     {
  100.         jump pvfs2_msgpairarray_sm;
  101.  
  102.         /*success => cleanup_msgpairarray;*/
  103.  
  104.         /*default case implies that at least one of the io requests failed*/
  105.         /*default => cleanup_mirror_work;*/
  106.         default => cleanup_msgpairarray;
  107.     }
  108.  
  109.    state cleanup_msgpairarray
  110.     {
  111.         run cleanup_msgpairarray;
  112.         success => post_ack_and_flow;
  113.         default => cleanup_mirror_work;
  114.     }
  115.  
  116.    state post_ack_and_flow
  117.     {
  118.         run post_ack_and_flow;
  119.         success => check_comm;
  120.         default => cleanup_mirror_work;
  121.     }
  122.  
  123.    state check_comm
  124.     {
  125.         run check_comm;
  126.         COMM_DONE => check_results;
  127.         default   => check_comm;
  128.     }
  129.  
  130.    state check_results
  131.     {
  132.         run check_results;
  133.         default => cleanup_mirror_work;
  134.     }
  135.  
  136.    state cleanup_mirror_work
  137.     {
  138.         run cleanup_mirror_work;
  139.         default => return;
  140.     }
  141.    
  142. } /*end nested machine pvfs2_mirror_work_sm*/
  143. %%
  144.  
  145. /*START OF pvfs2_mirror_sm*/
  146. static PINT_sm_action inspect_inputs(struct PINT_smcb *smcb, job_status_s *js_p)
  147. {
  148.     gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing mirror:inspect_inputs....\n");
  149.     gossip_debug(GOSSIP_MIRROR_DEBUG,"\tframe count is %d.\n",smcb->frame_count);
  150.     gossip_debug(GOSSIP_MIRROR_DEBUG,"\t base frame is %d.\n",smcb->base_frame);
  151.     struct PINT_server_op *s_op = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
  152.     struct PVFS_servreq_mirror *reqmir_p = &(s_op->req->u.mirror);
  153.     int i;
  154.  
  155.     js_p->error_code = 0;
  156.  
  157.     reqmir_p->bsize = s_op->ds_attr.u.datafile.b_size;
  158.     
  159.     if (s_op->req)
  160.     {
  161.         gossip_debug(GOSSIP_MIRROR_DEBUG,"\tREQUEST STRUCTURE EXISTS.\n");
  162.         gossip_debug(GOSSIP_MIRROR_DEBUG,"\trequest->op:%d"
  163.                                          "\tmirror.src_handle:%llu"
  164.                                          "\tmirror.dst_count: %d"
  165.                                          "\tmirror.fs_id:%d"
  166.                                          "\tmirror.dist.name:%s"
  167.                                          "\tmirror.bsize:%d"
  168.                                          "\tmirror.src_server_nr:%d"
  169.                                          "\n"
  170.                                          ,s_op->req->op
  171.                                          ,llu(reqmir_p->src_handle) 
  172.                                          ,reqmir_p->dst_count
  173.                                          ,reqmir_p->fs_id
  174.                                          ,reqmir_p->dist->dist_name
  175.                                          ,reqmir_p->bsize
  176.                                          ,reqmir_p->src_server_nr);
  177.  
  178.       for (i=0; i<reqmir_p->dst_count; i++)
  179.           gossip_debug(GOSSIP_MIRROR_DEBUG,"\treqmir->dst_handle[%d] : %llu\n"
  180.                                           ,i
  181.                                           ,llu(reqmir_p->dst_handle[i]));
  182.     }
  183.  
  184.     gossip_debug(GOSSIP_MIRROR_DEBUG,"\tds_attr.b_size:%d\n"
  185.                                     ,(int)s_op->ds_attr.u.datafile.b_size);
  186.     gossip_debug(GOSSIP_MIRROR_DEBUG,"\tobject type:%0x\n"
  187.                                     ,(int)s_op->attr.objtype);
  188.     gossip_debug(GOSSIP_MIRROR_DEBUG,"\tdatafile size:%d\n"
  189.                                     ,(int)s_op->attr.u.data.size);
  190.     gossip_debug(GOSSIP_MIRROR_DEBUG,"\tmask:%0x\n"
  191.                                     ,s_op->attr.mask);
  192.     return SM_ACTION_COMPLETE;
  193. }/*end action inspect_inputs*/
  194.  
  195.  
  196. static PINT_sm_action mirror_cleanup( struct PINT_smcb *smcb
  197.                                      ,job_status_s *js_p)
  198. {
  199.  
  200.    return(server_state_machine_complete(smcb));
  201.  
  202. }/*end action mirror_cleanup*/
  203.  
  204.  
  205.  
  206. /******************************************************************************/
  207. /* START OF pvfs2_mirror_work_sm                                              */
  208. /******************************************************************************/
  209. static PINT_sm_action initialize_structures(struct PINT_smcb *smcb
  210.                                            ,job_status_s *js_p)
  211. {
  212.     gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing mirror:"
  213.                                      "initialize_structures...\n");
  214.     gossip_debug(GOSSIP_MIRROR_DEBUG,"\tframe count is %d.\n",smcb->frame_count);
  215.     gossip_debug(GOSSIP_MIRROR_DEBUG,"\t base frame is %d.\n",smcb->base_frame);
  216.  
  217.     struct PINT_server_op *s_op = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
  218.     struct PVFS_servreq_mirror *reqmir_p = &(s_op->req->u.mirror);
  219.     struct PVFS_servresp_mirror *respmir_p = &(s_op->resp.u.mirror);
  220.     struct PINT_server_mirror_op *mir_p = &(s_op->u.mirror);
  221.     int i,ret;
  222.  
  223.     gossip_debug(GOSSIP_MIRROR_DEBUG,"\ts_op->op:%d\n",s_op->op);
  224.  
  225.     js_p->error_code = 0;
  226.  
  227.     memset(respmir_p,0,sizeof(*respmir_p));
  228.  
  229.     respmir_p->src_handle        = reqmir_p->src_handle;
  230.     respmir_p->src_server_nr     = reqmir_p->src_server_nr;
  231.     respmir_p->dst_count         = reqmir_p->dst_count;
  232.  
  233.     respmir_p->bytes_written = malloc(sizeof(uint32_t) * respmir_p->dst_count);
  234.     if (!respmir_p->bytes_written)
  235.     {
  236.        gossip_lerr("Unable to allocate respmir_p->bytes_written\n");
  237.        js_p->error_code = -PVFS_ENOMEM;
  238.        return SM_ACTION_COMPLETE;
  239.     }
  240.     memset(respmir_p->bytes_written,0,sizeof(uint32_t) * respmir_p->dst_count);
  241.  
  242.     respmir_p->write_status_code = malloc(sizeof(uint32_t) *
  243.                                           respmir_p->dst_count);
  244.     if (!respmir_p->write_status_code)
  245.     {
  246.        gossip_lerr("Unable to allocate respmir->write_status_code.\n");
  247.        js_p->error_code = -PVFS_ENOMEM;
  248.        return SM_ACTION_COMPLETE;
  249.     }
  250.     memset(respmir_p->write_status_code,0,sizeof(uint32_t) * 
  251.                                           respmir_p->dst_count);
  252.  
  253.     if (reqmir_p->bsize == 0)
  254.     {
  255.         gossip_debug(GOSSIP_MIRROR_DEBUG,"\tNo data to copy...\n");
  256.         js_p->error_code  = NO_DATA_TO_COPY;
  257.         return SM_ACTION_COMPLETE;
  258.     }
  259.  
  260.     if (s_op->req)
  261.     {
  262.         gossip_debug(GOSSIP_MIRROR_DEBUG,"\trequest->op:%d"
  263.                                          "\tmirror.src_handle:%llu"
  264.                                          "\tmirror.fs_id:%d"
  265.                                          "\tmirror.dist.name:%s"
  266.                                          "\tmirror.bsize:%d"
  267.                                          "\tmirror.src_server_nr:%d"
  268.                                          "\tmirror.dst_count:%d"
  269.                                          "\n"
  270.                                          ,s_op->req->op
  271.                                          ,llu(reqmir_p->src_handle) 
  272.                                          ,reqmir_p->fs_id
  273.                                          ,reqmir_p->dist->dist_name
  274.                                          ,reqmir_p->bsize
  275.                                          ,reqmir_p->src_server_nr
  276.                                          ,reqmir_p->dst_count);
  277.       for (i=0; i<reqmir_p->dst_count; i++)
  278.       {
  279.           gossip_debug(GOSSIP_MIRROR_DEBUG,"\tmirror.dst_handle[%d] : %llu\n"
  280.                                           ,i
  281.                                           ,llu(reqmir_p->dst_handle[i]));
  282.       }
  283.     }
  284.  
  285.     gossip_debug(GOSSIP_MIRROR_DEBUG,"\tCreating jobs array....\n");
  286.  
  287.     mir_p->jobs = malloc(sizeof(write_job_t) * reqmir_p->dst_count);
  288.     if (!mir_p->jobs)
  289.     {
  290.        gossip_lerr("Unable to allocate jobs array.\n");
  291.        js_p->error_code = -PVFS_ENOMEM;
  292.        return SM_ACTION_COMPLETE;
  293.     }    
  294.     memset(mir_p->jobs,0,sizeof(write_job_t) * reqmir_p->dst_count);
  295.  
  296.     /*Scheduling occurred in pvfs2_mirror_sm/prelude if the schedule_id is    */
  297.     /*already provided.  Otherwise, this mirror request was called as a nested*/
  298.     /*function, and therefore needs to be scheduled.                          */
  299.     if (s_op->scheduled_id != 0) 
  300.         return SM_ACTION_COMPLETE;
  301.  
  302.     ret = job_req_sched_post(s_op->op
  303.                             ,reqmir_p->fs_id
  304.                             ,reqmir_p->src_handle
  305.                             ,PINT_server_req_get_access_type(s_op->req)
  306.                             ,PINT_server_req_get_sched_policy(s_op->req)
  307.                             ,smcb
  308.                             ,0
  309.                             ,js_p
  310.                             ,&(s_op->scheduled_id)
  311.                             ,server_job_context);
  312.     return (ret);
  313. }/*end action initialize_structures*/
  314.  
  315.  
  316.  
  317. static PINT_sm_action setup_write_request(struct PINT_smcb *smcb
  318.                                          ,job_status_s *js_p)
  319. {
  320.     gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing mirror:"
  321.                                      "setup_write_request.....\n");
  322.  
  323.     struct PINT_server_op *s_op = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
  324.     struct PVFS_servreq_mirror *reqmir_p = &(s_op->req->u.mirror);
  325.     struct PINT_server_mirror_op *mir_p  = &(s_op->u.mirror);
  326.     write_job_t *jobs = mir_p->jobs;
  327.     int ret,i;
  328.     PVFS_Request myFileReq = PVFS_BYTE;
  329.     PVFS_offset  myFileReqOffset = 0;
  330.  
  331.     js_p->error_code = 0;
  332.  
  333.     /*initialize msgarray_op structure*/
  334.     PINT_sm_msgarray_op *msgarray_op = &(s_op->msgarray_op);
  335.     memset(msgarray_op,0,sizeof(PINT_sm_msgarray_op));
  336.  
  337.     /*parameters are setup like a client except for job_context*/
  338.     PINT_serv_init_msgarray_params(s_op,reqmir_p->fs_id);
  339.  
  340.     /*allocate a mspair_state structure for each destination handle.*/
  341.     ret=PINT_msgpairarray_init(msgarray_op,reqmir_p->dst_count);
  342.     if (ret)
  343.     {
  344.         gossip_lerr("Failed to allocate msgarray.\n");
  345.         js_p->error_code = ret;
  346.         return SM_ACTION_COMPLETE;
  347.     }
  348.  
  349.     /*setup msgpairarray to initiate PVFS_SERV_IO write request for the       */
  350.     /*destination handles.                                                    */
  351.     for (i=0; i<reqmir_p->dst_count; i++)
  352.     {
  353.        PINT_sm_msgpair_state *msg_p = &(msgarray_op->msgarray[i]);
  354.  
  355.        msg_p->fs_id      = reqmir_p->fs_id;
  356.        msg_p->handle     = reqmir_p->dst_handle[i];
  357.        msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
  358.        msg_p->comp_fn    = write_comp_fn;    
  359.  
  360.        /*determine the BMI svr address for the destination handle*/
  361.        ret = PINT_cached_config_map_to_server(&msg_p->svr_addr
  362.                                              ,msg_p->handle
  363.                                              ,msg_p->fs_id );
  364.        if (ret)
  365.        {
  366.            gossip_lerr("Failed to map address\n");
  367.            js_p->error_code = ret;
  368.            return SM_ACTION_COMPLETE;
  369.        }
  370.  
  371.        /*save the svr_addr for later use*/
  372.        jobs[i].svr_addr = msg_p->svr_addr;
  373.  
  374.  
  375.        /*setup the server PVFS_SERV_IO write request itself*/
  376.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\treqmir_p->bsize:%d.\n"
  377.                                        ,reqmir_p->bsize);
  378.        PINT_SERVREQ_IO_FILL( msg_p->req  
  379.                             ,s_op->req->credentials
  380.                             ,reqmir_p->fs_id
  381.                             ,reqmir_p->dst_handle[i]
  382.                             ,PVFS_IO_WRITE
  383.                             ,reqmir_p->flow_type
  384.                             ,0
  385.                             ,1
  386.                             ,reqmir_p->dist
  387.                             ,myFileReq
  388.                             ,myFileReqOffset
  389.                             ,reqmir_p->bsize
  390.                             ,NULL );
  391.     }/*end for*/
  392.  
  393.     PINT_sm_push_frame(smcb,0,msgarray_op);
  394.   
  395.     return SM_ACTION_COMPLETE;
  396.  
  397. }/*end action setup_write_request*/
  398.  
  399.  
  400. static PINT_sm_action cleanup_msgpairarray( struct PINT_smcb *smcb
  401.                                           , job_status_s *js_p)
  402. {
  403.     gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing MIRROR:cleanup_msgpairarray\n");
  404.  
  405.     struct PINT_server_op *s_op = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
  406.     struct PVFS_servreq_mirror *reqmir_p = &(s_op->req->u.mirror);
  407.  
  408.     PINT_sm_msgarray_op *msgarray_op = &(s_op->msgarray_op);
  409.     PINT_server_mirror_op *mir_op = &(s_op->u.mirror);
  410.     write_job_t *jobs = mir_op->jobs;
  411.     int i;
  412.  
  413.     js_p->error_code = 0;
  414.  
  415.     gossip_debug(GOSSIP_MIRROR_DEBUG,"\tCURRENT:\tsmcb->base_frame:%d"
  416.                                      "\tframe_count:%d\n"
  417.                                     ,smcb->base_frame,smcb->frame_count);
  418.  
  419.     /*if ALL msgpairs have errors, then set an error code and skip the rest */
  420.     /*of this request.                                                      */
  421.     for (i=0; i<reqmir_p->dst_count; i++)
  422.     {
  423.         if (jobs[i].io_status == 0)
  424.            break;
  425.     }
  426.     if (i==reqmir_p->dst_count)
  427.     {
  428.         js_p->error_code = -PVFS_EIO;
  429.         return SM_ACTION_COMPLETE;
  430.     }
  431.  
  432.     /*retain the session/flow identifier created in the PVFS_SERV_IO request */
  433.     /*for each destination handle.                                           */
  434.     for (i=0; i<reqmir_p->dst_count; i++)
  435.     {
  436.        if (jobs[i].io_status == 0)
  437.        {
  438.           jobs[i].session_tag = msgarray_op->msgarray[i].session_tag;
  439.        } else {
  440.           jobs[i].session_tag = 0; /*session tags are NEVER zero*/
  441.                                    /*PINT_util_get_next_tag()   */
  442.        } 
  443.     }
  444.  
  445.     /*will free msgarray if necessary*/
  446.     PINT_msgpairarray_destroy(msgarray_op);
  447.  
  448.     gossip_debug(GOSSIP_MIRROR_DEBUG,"\tIO_STATUS & SESSION TAG\n");
  449.     for (i=0; i<reqmir_p->dst_count; i++)
  450.     {
  451.         gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\tio_status[%d]:%d "
  452.                                          "\tsession_tag[%d]:%d\n"
  453.                                         ,i,jobs[i].io_status
  454.                                         ,i,jobs[i].session_tag);
  455.     }
  456.  
  457.  
  458.     gossip_debug(GOSSIP_MIRROR_DEBUG,"Leaving MIRROR:cleanup msgpairarray.\n");
  459.  
  460.     return SM_ACTION_COMPLETE;
  461. }/*end action cleanup_msgpairarray*/
  462.  
  463.  
  464.  
  465.  
  466. static PINT_sm_action post_ack_and_flow (struct PINT_smcb *smcb
  467.                                         ,job_status_s *js_p)
  468. {
  469.     gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing mirror:post_ack_and_flow..\n");
  470.  
  471.     struct PINT_server_op *s_op = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
  472.     struct PVFS_servreq_mirror *reqmir_p = &(s_op->req->u.mirror);
  473.     PINT_server_mirror_op      *mir_op   = &(s_op->u.mirror);
  474.     write_job_t *jobs = mir_op->jobs;
  475.  
  476.     int ret,i;
  477.     unsigned long status_user_tag = 0;
  478.  
  479.     js_p->error_code  = 0;
  480.  
  481.     mir_op->job_count = 0;
  482.     mir_op->max_resp_sz = PINT_encode_calc_max_size( PINT_ENCODE_RESP
  483.                                                     ,PVFS_SERV_WRITE_COMPLETION
  484.                                                     ,reqmir_p->encoding );
  485.     gossip_debug(GOSSIP_MIRROR_DEBUG,"\tmax_resp_sz:%d\n",mir_op->max_resp_sz);
  486.  
  487.     /*get flow info from the server configuration file.*/
  488.     struct filesystem_configuration_s *cur_fs = NULL;
  489.     struct server_configuration_s *server_config = NULL;
  490.  
  491.     server_config = get_server_config_struct();
  492.     cur_fs = PINT_config_find_fs_id(server_config,reqmir_p->fs_id);
  493.          
  494.     /*post write-ack and flow for each destination handle*/
  495.     for (i=0; i<reqmir_p->dst_count; i++)
  496.     { 
  497.        /*if the initial IO request for this destination handle failed, then */
  498.        /*skip it.                                                           */
  499.        if (jobs[i].io_status != 0)
  500.        {
  501.            gossip_debug(GOSSIP_MIRROR_DEBUG,"io_status[%d] : %d ...skipping\n"
  502.                                            ,i,jobs[i].io_status);
  503.            continue;
  504.        }
  505.  
  506.        jobs[i].encoded_resp_p = BMI_memalloc( jobs[i].svr_addr
  507.                                              ,mir_op->max_resp_sz
  508.                                              ,BMI_RECV );
  509.        if (!jobs[i].encoded_resp_p)
  510.        {
  511.            gossip_lerr("mirror:BMI_memalloc (for write ack) failed.\n");
  512.            js_p->error_code = -PVFS_ENOMEM;
  513.            continue; 
  514.        }
  515.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\tencoded response successfully "
  516.                                         "allocated.\n");
  517.  
  518.        /*pre-post this recv with an infinite timeout and adjust it after the  */
  519.        /*flow completes, since we don't know how long a flow can take at this */
  520.        /*point.                                                               */
  521.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\trecv_id:%d\n",(int)jobs[i].recv_id);
  522.        status_user_tag = TAG(WRITE_ACK_RCV,i);
  523.        ret = job_bmi_recv( jobs[i].svr_addr
  524.                           ,jobs[i].encoded_resp_p
  525.                           ,mir_op->max_resp_sz
  526.                           ,jobs[i].session_tag
  527.                           ,BMI_PRE_ALLOC
  528.                           ,smcb
  529.                           ,status_user_tag
  530.                           ,&(jobs[i].recv_status)
  531.                           ,&(jobs[i].recv_id)
  532.                           ,server_job_context
  533.                           ,JOB_TIMEOUT_INF
  534.                           ,NULL );
  535.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\tWRITE_ACK_RCV:return code:%d\n"
  536.                                    ,ret);
  537.        /*we expect this job to __not__ complete immediately, since we have not*/
  538.        /*posted the flow.                                                     */
  539.        if (ret == 1 && jobs[i].recv_status.error_code)
  540.        {   
  541.            /*Error posting the job*/
  542.            js_p->error_code = jobs[i].recv_status.error_code;
  543.            continue;
  544.        }
  545.        else if (ret == 1)
  546.        {
  547.            /*Job completed immediately with no errors.  In this context, */
  548.            /*immediate completion is an error.                           */
  549.            js_p->error_code = -EPERM; /*operation not permitted.*/
  550.            continue;
  551.        }
  552.        else if (ret != 0)
  553.        {
  554.            /*Error adding job to the job_time_mgr*/
  555.            js_p->error_code = ret;
  556.            continue;
  557.        }
  558.  
  559.        /*increment once for successful post of write-ack*/
  560.        mir_op->job_count++;
  561.  
  562.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\tWrite ACK recv successfully posted."
  563.                                         "\tjob_id:%d \tjob_count:%d\n"
  564.                                        ,(int)jobs[i].recv_id,mir_op->job_count);
  565.  
  566.  
  567.        /*issue flow request for the src datahandle using the session tag      */
  568.        /* obtained from the PVFS_SERV_IO request.                             */
  569.        /*                                                                     */
  570.        /*setup the flow descriptor.  Read from the src datahandle and send to */
  571.        /*the destination BMI endpoint.  The PVFS_SERV_IO has already setup the*/
  572.        /*other end of the flow (BMI src/TROVE dest).                          */
  573.        jobs[i].flow_desc = PINT_flow_alloc();
  574.        if (!jobs[i].flow_desc)
  575.        {
  576.            js_p->error_code = -PVFS_ENOMEM;
  577.            gossip_lerr("unable to allocate memory for flow descriptor");
  578.            job_bmi_cancel(jobs[i].recv_id,server_job_context);
  579.            mir_op->job_count--;
  580.            continue;
  581.        }
  582.  
  583.        PINT_flow_reset(jobs[i].flow_desc);
  584.  
  585.        jobs[i].flow_desc->src.endpoint_id     = TROVE_ENDPOINT;
  586.        jobs[i].flow_desc->src.u.trove.handle  = reqmir_p->src_handle;
  587.        jobs[i].flow_desc->src.u.trove.coll_id = reqmir_p->fs_id;
  588.  
  589.        jobs[i].flow_desc->dest.endpoint_id   = BMI_ENDPOINT;
  590.        jobs[i].flow_desc->dest.u.bmi.address = jobs[i].svr_addr;
  591.  
  592.        jobs[i].flow_desc->buffer_size      = cur_fs->fp_buffer_size;
  593.        jobs[i].flow_desc->buffers_per_flow = cur_fs->fp_buffers_per_flow;
  594.  
  595.        jobs[i].flow_desc->file_data.extend_flag = 1;
  596.        jobs[i].flow_desc->file_data.fsize       = reqmir_p->bsize;
  597.        jobs[i].flow_desc->file_data.dist        = reqmir_p->dist;
  598.        jobs[i].flow_desc->file_data.server_nr   = 0;
  599.        jobs[i].flow_desc->file_data.server_ct   = 1;
  600.  
  601.        jobs[i].flow_desc->file_req              = PVFS_BYTE;
  602.        jobs[i].flow_desc->file_req_offset       = 0;
  603.        jobs[i].flow_desc->mem_req               = NULL;
  604.  
  605.        jobs[i].flow_desc->tag                   = jobs[i].session_tag;
  606.        jobs[i].flow_desc->type                  = reqmir_p->flow_type;
  607.        jobs[i].flow_desc->user_ptr              = NULL;
  608.        jobs[i].flow_desc->aggregate_size        = reqmir_p->bsize;
  609.  
  610.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\tbsize:%lld \tdatafile:nr:%d\tct:%d"
  611.                                         "\toffset:%lld \ttag:%d\n"
  612.                                        ,lld(jobs[i].flow_desc->file_data.fsize)
  613.                                        ,jobs[i].flow_desc->file_data.server_nr
  614.                                        ,jobs[i].flow_desc->file_data.server_ct
  615.                                        ,lld(jobs[i].flow_desc->file_req_offset)
  616.                                        ,jobs[i].flow_desc->tag );
  617.  
  618.        /*post the flow*/
  619.        status_user_tag = TAG(SRC_FLOW_POST,i);
  620.        ret = job_flow( jobs[i].flow_desc
  621.                       ,smcb
  622.                       ,status_user_tag
  623.                       ,&(jobs[i].flow_status)
  624.                       ,&(jobs[i].flow_job_id)
  625.                       ,server_job_context
  626.                       ,server_config->server_job_flow_timeout
  627.                       ,NULL );
  628.  
  629.       /*if the flow fails immediately, then we have to do some special        */
  630.       /*handling.  This function is not equipped to handle the failure        */
  631.       /*directly, so instead we post a null job that will propagate the error */
  632.       /*to the normal state where we interpret flow errors.                   */
  633.       gossip_debug(GOSSIP_MIRROR_DEBUG,"\tSRC_FLOW_POST:return code:%d\n"
  634.                                       ,ret);
  635.       gossip_debug(GOSSIP_MIRROR_DEBUG,"\tflow job id:%d\n"
  636.                                       ,(int)jobs[i].flow_job_id);
  637.       if (ret<0)
  638.       {
  639.           /* a failure occured while adding this job to the job_time_mgr*/
  640.           js_p->error_code = ret;
  641.           job_bmi_cancel(jobs[i].recv_id,server_job_context);
  642.           mir_op->job_count--;
  643.           continue;
  644.       }
  645.       else if (ret == 1 && jobs[i].flow_status.error_code == 0)
  646.       {
  647.           /*job completed immediately AND was successful*/
  648.           js_p->error_code = 0;
  649.           /*increment job_count again for successful post of flow*/
  650.           mir_op->job_count++;
  651.           continue;
  652.       }
  653.       else if (jobs[i].flow_status.error_code)
  654.       {
  655.           /*job completed immediately AND was NOT successful*/
  656.           js_p->error_code = jobs[i].flow_status.error_code;
  657.           job_bmi_cancel(jobs[i].recv_id,server_job_context);
  658.           mir_op->job_count--;
  659.           continue;      
  660.       }
  661.  
  662.       /*increment job_count again for successful post of flow*/
  663.       mir_op->job_count++;
  664.  
  665.       gossip_debug(GOSSIP_MIRROR_DEBUG,"\tsuccessfully posted flow "
  666.                                        "\tjob_count : %d...\n"
  667.                                       ,mir_op->job_count);  
  668.   }/*end for each destination handle*/
  669.  
  670.   /*if the job_count > 0, then at least one of the IO requests was successful */
  671.   /*and the submission of the write-ack and flow were also successful.        */
  672.   if (mir_op->job_count > 0)
  673.      return SM_ACTION_DEFERRED;
  674.   else
  675.   {
  676.      /*if job_count is zero, then nothing worked.*/
  677.      gossip_debug(GOSSIP_MIRROR_DEBUG,"\tNo jobs successfully posted in "
  678.                                       "post_ack_and_flow : %d\n"
  679.                                      ,js_p->error_code);
  680.      js_p->error_code = -PVFS_EIO;
  681.      return SM_ACTION_COMPLETE;
  682.   }
  683. }/*end action post_ack_and_flow*/
  684.  
  685.  
  686. static PINT_sm_action check_comm ( struct PINT_smcb *smcb
  687.                                   ,job_status_s *js_p)
  688. {
  689.    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing check_comm...\n");
  690.  
  691.    struct PINT_server_op      *s_op    = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
  692.    struct PVFS_servreq_mirror *reqmir_p = &(s_op->req->u.mirror);
  693.  
  694.    PINT_server_mirror_op *mir_op   = &(s_op->u.mirror);
  695.    write_job_t *jobs = mir_op->jobs;
  696.  
  697.    job_aint status_user_tag = js_p->status_user_tag;
  698.    int ret,i;
  699.  
  700.    struct filesystem_configuration_s *cur_fs        = NULL;
  701.    struct server_configuration_s     *server_config = NULL;
  702.  
  703.    server_config = get_server_config_struct();
  704.    cur_fs = PINT_config_find_fs_id(server_config,reqmir_p->fs_id);
  705.  
  706.    /*status_user_tag's will only exist for those destination handles that had */
  707.    /*successful posts of a write-ack and flow.                                */
  708.    switch(PHASE(status_user_tag))
  709.    {
  710.        case SRC_FLOW_POST:
  711.        {
  712.            gossip_debug(GOSSIP_MIRROR_DEBUG,"\tReceived SRC_FLOW_POST for "
  713.                                             "dst(%d)...\n"
  714.                                            ,(int)DST(status_user_tag));
  715.            i = (int)DST(status_user_tag);
  716.            jobs[i].flow_status = *js_p;
  717.            mir_op->job_count--;
  718.            if (mir_op->job_count > 0)
  719.            {
  720.               ret = job_reset_timeout(jobs[i].recv_id
  721.                                      ,server_config->server_job_bmi_timeout);
  722.               if (ret == 0 || ret == -PVFS_EINVAL)
  723.               {
  724.                  gossip_debug(GOSSIP_MIRROR_DEBUG,"\ttimeout reset:%d(%0x)\n"
  725.                                                  ,ret,ret);
  726.                  /*ack was reset or it has already completed.*/
  727.               }
  728.               else
  729.               {
  730.                   gossip_lerr("Unable to reset timeout");
  731.                   return(ret);
  732.               }               
  733.            }
  734.            break;
  735.        }
  736.        case WRITE_ACK_RCV:
  737.        {
  738.            i=(int)DST(status_user_tag);
  739.            jobs[i].recv_status = *js_p;
  740.            mir_op->job_count--;
  741.            gossip_debug(GOSSIP_MIRROR_DEBUG,"\tReceived WRITE_ACK_RCV for "
  742.                                             "dst(%d)...\n"
  743.                                            ,(int)DST(status_user_tag));
  744.            gossip_debug(GOSSIP_MIRROR_DEBUG,"\tjob_count:%d\n"
  745.                                            ,mir_op->job_count);
  746.            gossip_debug(GOSSIP_MIRROR_DEBUG,"\trecv_status.error_code:%d\n"
  747.                                            ,jobs[i].recv_status.error_code);
  748.            break;
  749.        }
  750.        default:
  751.        {
  752.            gossip_debug(GOSSIP_MIRROR_DEBUG,"\tReceived unknown:%d\n"
  753.                                            ,(int)status_user_tag);
  754.            mir_op->job_count = 0;
  755.            break;
  756.        }
  757.    }/*end switch*/
  758.  
  759.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tjs_p->error_code:%d"
  760.                                     "\tjs_p->actual_size:%d\n"
  761.                                    ,js_p->error_code
  762.                                    ,(int)js_p->actual_size);
  763.  
  764.    js_p->error_code = 0;
  765.  
  766.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tjob_count:%d\n",mir_op->job_count);
  767.    if (mir_op->job_count) /*more jobs to process*/
  768.    {
  769.       gossip_debug(GOSSIP_MIRROR_DEBUG,"\tjs_p->error_code:%d "
  770.                                        "\tjob_count:%d "
  771.                                        "\tleaving deferred\n"
  772.                                       ,js_p->error_code
  773.                                       ,mir_op->job_count);
  774.       return SM_ACTION_DEFERRED;
  775.    }
  776.  
  777.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tjs_p->error_code:%d"
  778.                                     "\tjob_count:%d"
  779.                                     "\tleaving complete\n"
  780.                                    ,js_p->error_code
  781.                                    ,mir_op->job_count);
  782.  
  783.    if (mir_op->job_count == 0) /*no more jobs to process*/
  784.        js_p->error_code = COMM_DONE;
  785.  
  786.    return SM_ACTION_COMPLETE;
  787. }/*end action check_comm*/
  788.  
  789.  
  790. static PINT_sm_action check_results( struct PINT_smcb *smcb
  791.                                     ,job_status_s *js_p )
  792. {
  793.    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing check_results...\n");
  794.    struct PINT_server_op *s_op   = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
  795.    struct PVFS_servreq_mirror  *reqmir_p  = &(s_op->req->u.mirror);
  796.    struct PVFS_servresp_mirror *respmir_p = &(s_op->resp.u.mirror);
  797.    PINT_server_mirror_op *mir_p  = &(s_op->u.mirror);
  798.    write_job_t *jobs = mir_p->jobs;
  799.  
  800.    struct PINT_decoded_msg decoded_resp = {0};
  801.    struct PVFS_server_resp *resp = NULL;
  802.  
  803.    int ret,i;
  804.  
  805.    js_p->error_code = 0;
  806.  
  807.    /*check result statuses for each destination handle*/
  808.    for (i=0; i<reqmir_p->dst_count; i++)
  809.    {
  810.       if (jobs[i].io_status != 0)
  811.       {
  812.           respmir_p->write_status_code[i] = jobs[i].io_status;
  813.       }
  814.       else if (jobs[i].recv_status.error_code)
  815.       {
  816.           gossip_debug(GOSSIP_MIRROR_DEBUG,"\tACK rcv failed:%d\n"
  817.                                           ,jobs[i].recv_status.error_code );
  818.           respmir_p->write_status_code[i] = jobs[i].recv_status.error_code;
  819.       }
  820.       else if (jobs[i].flow_status.error_code)
  821.       {
  822.           gossip_debug(GOSSIP_MIRROR_DEBUG,"\tFLOW failed:%d\n"
  823.                                           ,jobs[i].flow_status.error_code);
  824.           respmir_p->write_status_code[i] = jobs[i].flow_status.error_code;
  825.       }
  826.       else
  827.       {
  828.           gossip_debug(GOSSIP_MIRROR_DEBUG,"\tACK & FLOW succeeded\n");
  829.           gossip_debug(GOSSIP_MIRROR_DEBUG,"\trecv_status.actual_size:%d\n"
  830.                                          ,(int)jobs[i].recv_status.actual_size);
  831.           gossip_debug(GOSSIP_MIRROR_DEBUG,"\tparameters sent into "
  832.                                            "PINT_serv_decode_resp:\n");
  833.           gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\tfs_id : %d\n"
  834.                                            "\t\t\tencoded_resp_p : %p\n"
  835.                                            "\t\t\tdecoded_resp : %p\n"
  836.                                            "\t\t\tsvr_addr : %ld\n"
  837.                                            "\t\t\tactual_size : %d\n"
  838.                                            "\t\t\tresp : %p\n"
  839.                                           ,reqmir_p->fs_id
  840.                                           ,(void *)jobs[i].encoded_resp_p
  841.                                           ,&decoded_resp
  842.                                           ,(long int)jobs[i].svr_addr
  843.                                           ,(int)jobs[i].recv_status.actual_size
  844.                                           ,resp);
  845.           ret = PINT_serv_decode_resp( reqmir_p->fs_id
  846.                                       ,jobs[i].encoded_resp_p
  847.                                       ,&decoded_resp
  848.                                       ,&(jobs[i].svr_addr)
  849.                                       ,jobs[i].recv_status.actual_size
  850.                                       ,&resp );
  851.           gossip_debug(GOSSIP_MIRROR_DEBUG,"\tafter PINT_serv_decode_resp..\n");
  852.           if (ret == 0)
  853.           {
  854.              gossip_debug(GOSSIP_MIRROR_DEBUG,"\tsession_tag : %d\n"
  855.                                              ,jobs[i].session_tag);
  856.              respmir_p->bytes_written[i] = 
  857.                      resp->u.write_completion.total_completed;
  858.              gossip_debug(GOSSIP_MIRROR_DEBUG,"\tafter bytes_written..\n");
  859.              respmir_p->write_status_code[i] = jobs[i].recv_status.error_code;
  860.              gossip_debug(GOSSIP_MIRROR_DEBUG,"\tafter write_status_code..\n");
  861.              PINT_decode_release(&decoded_resp, PINT_DECODE_RESP);
  862.              gossip_debug(GOSSIP_MIRROR_DEBUG,"\tbytes written:%d "
  863.                                               "\tresp status:%d "
  864.                                               "\tresp op:%d\n"
  865.                                              ,respmir_p->bytes_written[i]
  866.                                              ,respmir_p->write_status_code[i]
  867.                                              ,resp->op );
  868.           }
  869.           else
  870.           {
  871.              gossip_lerr("PINT_serv_decode_resp failed(%d)",ret);
  872.              respmir_p->write_status_code[i] = ret;
  873.           }  
  874.       }/*end if*/
  875.  
  876.       if (jobs[i].flow_desc)
  877.          PINT_flow_free(jobs[i].flow_desc);
  878.       if (jobs[i].encoded_resp_p)
  879.           BMI_memfree( jobs[i].svr_addr
  880.                       ,jobs[i].encoded_resp_p
  881.                       ,mir_p->max_resp_sz
  882.                       ,BMI_RECV);
  883.    }/*end for each destination handle*/
  884.  
  885.    /*if at least ONE of the writes was successful, then return a zero to indi-*/
  886.    /*cate that ALL status codes must be checked.  If NONE of the writes were  */
  887.    /*successful, then return a non-zero to indicate that individual statuses  */
  888.    /*do not need to be checked.                                               */
  889.    for (i=0; i<reqmir_p->dst_count; i++)
  890.    {
  891.        if (respmir_p->write_status_code[i] == 0)
  892.           break;
  893.    }
  894.    if (i==reqmir_p->dst_count)
  895.        js_p->error_code = -PVFS_EIO;
  896.    else
  897.        js_p->error_code = 0;
  898.  
  899.    return SM_ACTION_COMPLETE;
  900. }/*end action check_results*/
  901.  
  902.  
  903.  
  904.  
  905.  
  906.  
  907.  
  908. static PINT_sm_action cleanup_mirror_work(struct PINT_smcb *smcb
  909.                                          ,job_status_s *js_p)
  910. {
  911.     gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing cleanup_mirror_work.....\n");
  912.  
  913.     struct PINT_server_op *s_op = PINT_sm_frame(smcb,PINT_FRAME_CURRENT);
  914.     struct PVFS_servresp_mirror  *respmir_p = &(s_op->resp.u.mirror);
  915.     struct PVFS_servreq_mirror   *reqmir_p  = &(s_op->req->u.mirror);
  916.     struct PINT_server_mirror_op *mir_p     = &(s_op->u.mirror);
  917.     write_job_t *jobs = mir_p->jobs;
  918.     int i,ret;
  919.     job_id_t job_id;
  920.  
  921.     gossip_debug(GOSSIP_MIRROR_DEBUG,"\tIN:js_p->error_code:%d\n"
  922.                                     ,js_p->error_code);
  923.     if (js_p->error_code == NO_DATA_TO_COPY)
  924.     {
  925.         js_p->error_code = 0;
  926.     }
  927.  
  928.     if (mir_p->jobs)
  929.     {
  930.         gossip_debug(GOSSIP_MIRROR_DEBUG,"\tJOBS IO STATUS...\n");
  931.         for (i=0; i<reqmir_p->dst_count; i++)
  932.             gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\tio_status(%d):%d\n"
  933.                                             ,i
  934.                                             ,(int)jobs[i].io_status);
  935.     }
  936.  
  937.     if (mir_p->jobs)
  938.         free(mir_p->jobs);
  939.  
  940.     gossip_debug(GOSSIP_MIRROR_DEBUG,"\tOUT:js_p->error_code:%d\n"
  941.                                     ,js_p->error_code);
  942.  
  943.     gossip_debug(GOSSIP_MIRROR_DEBUG,"\trespmir_p->src_handle:%llu\n"
  944.                                     ,llu(respmir_p->src_handle));
  945.    
  946.     for (i=0; i<respmir_p->dst_count; i++)
  947.     {
  948.         gossip_debug(GOSSIP_MIRROR_DEBUG,"\t\tbytes_written[%d]:%d\n"
  949.                                          "\t\t\twrite_status_code[%d]:%d\n"
  950.                                         ,i,respmir_p->bytes_written[i]
  951.                                         ,i,respmir_p->write_status_code[i]);
  952.     }
  953.     gossip_debug(GOSSIP_MIRROR_DEBUG,"\ts_op->resp.status:%d\n"
  954.                                     ,s_op->resp.status);
  955.  
  956.     /*If s_op->schedule_id is non-zero, then we must release the object */
  957.     /*from the scheduler.                                               */
  958.     if (s_op->scheduled_id)
  959.     {
  960.        ret = job_req_sched_release(s_op->scheduled_id
  961.                                   ,smcb
  962.                                   ,0
  963.                                   ,js_p
  964.                                   ,&job_id
  965.                                   ,server_job_context);
  966.        s_op->scheduled_id = 0;
  967.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\tLeaving cleanup_mirror_work.....\n");
  968.        return (ret);
  969.     }
  970.  
  971.     gossip_debug(GOSSIP_MIRROR_DEBUG,"Leaving cleanup_mirror_work.....\n");
  972.     return SM_ACTION_COMPLETE;
  973. }/*end action cleanup_mirror_work*/
  974.  
  975.  
  976.  
  977. int write_comp_fn(void *v_p, struct PVFS_server_resp *resp_p, int i)
  978. {
  979.   /*This function executes AFTER each msgpair has completed and is under the  */
  980.   /*control of msgpairarray.sm.  Here, we will capture the response from the  */
  981.   /*PVFS_SERV_IO request; however, the response pertains only to the initial  */
  982.   /*ACK and start of the flow on the remote server.  We will retain the status*/
  983.   /*from each response and then check it when we return from the jump to      */
  984.   /*msgpairarray.  We will always return a zero from this function, even if   */
  985.   /*the request failed, so we can check it later.                             */
  986.    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing write_comp_fn.....\n");
  987.  
  988.    PINT_smcb *smcb = v_p;
  989.    struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
  990.    struct PINT_server_mirror_op *mir_p = &(s_op->u.mirror);
  991.    write_job_t *jobs = mir_p->jobs;
  992.  
  993.    /*resp_p contains the response from the PVFS_SERV_IO request after it */
  994.    /*has posted its initial write ack and flow.                          */
  995.  
  996.    jobs[i].io_status = resp_p->status;
  997.  
  998.  
  999.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tInitial ACK from IO write request(%d). "
  1000.                                     "bstream_size is %d.\n"
  1001.                                    ,i
  1002.                                    ,(int)resp_p->u.io.bstream_size);
  1003.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tstatus:%d\n",(int)jobs[i].io_status);
  1004.  
  1005.    if (resp_p->status != 0)
  1006.    {
  1007.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\tNegative response from "
  1008.                                         "PVFS_SERV_IO:%d\n"
  1009.                                        ,resp_p->status);
  1010.    }
  1011.  
  1012.    return(0);
  1013. } /*end msgpair completion function mirror_comp_fn*/
  1014.  
  1015.  
  1016.  
  1017.  
  1018. /*set handle and fs-id ... required by the state machine processor*/
  1019. static inline int PINT_get_object_ref_mirror( struct PVFS_server_req *req
  1020.                                              ,PVFS_fs_id *fs_id
  1021.                                              ,PVFS_handle *handle )
  1022. {
  1023.    *fs_id  = req->u.mirror.fs_id;
  1024.    *handle = req->u.mirror.src_handle;
  1025.     return 0;
  1026. };
  1027.  
  1028. /*request parameters*/
  1029. struct PINT_server_req_params pvfs2_mirror_params =
  1030. {
  1031.     .string_name = "mirror",
  1032.     .perm = PINT_SERVER_CHECK_NONE,
  1033.     .access_type = PINT_server_req_modify,
  1034.     .sched_policy = PINT_SERVER_REQ_SCHEDULE,
  1035.     .get_object_ref = PINT_get_object_ref_mirror,
  1036.     .state_machine = &pvfs2_mirror_sm
  1037. };
  1038. /****************************end of file***************************************/
  1039.  
  1040.  
  1041.  
  1042.  
  1043.  
  1044.  
  1045.