home *** CD-ROM | disk | FTP | other *** search
/ ftp.parl.clemson.edu / 2015-02-07.ftp.parl.clemson.edu.tar / ftp.parl.clemson.edu / pub / pvfs2 / orangefs-2.8.3-20110323.tar.gz / orangefs-2.8.3-20110323.tar / orangefs / src / server / io.sm < prev    next >
Text File  |  2009-09-03  |  13KB  |  434 lines

  1. /* 
  2.  * (C) 2001 Clemson University and The University of Chicago 
  3.  *
  4.  * See COPYING in top-level directory.
  5.  */
  6.  
  7. /*
  8.  *  PVFS2 server state machine for driving I/O operations (read and write).
  9.  */
  10.  
  11. #include <string.h>
  12. #include <assert.h>
  13.  
  14. #include "server-config.h"
  15. #include "pvfs2-server.h"
  16. #include "pvfs2-attr.h"
  17. #include "pvfs2-request.h"
  18. #include "pint-distribution.h"
  19. #include "pint-request.h"
  20. #include "pvfs2-internal.h"
  21.  
  22. %%
  23.  
  24. machine pvfs2_io_sm
  25. {
  26.     state prelude
  27.     {
  28.         jump pvfs2_prelude_sm;
  29.         success => send_positive_ack;
  30.         default => send_negative_ack;
  31.     }
  32.  
  33.     state send_positive_ack
  34.     {
  35.         run io_send_ack;
  36.         success => start_flow;
  37.         default => release;
  38.     }
  39.  
  40.     state send_negative_ack
  41.     {
  42.         run io_send_ack;
  43.         default => release;
  44.     }
  45.  
  46.     state start_flow
  47.     {
  48.         run io_start_flow;
  49.         success => send_completion_ack;
  50.         default => release;
  51.     }
  52.  
  53.     state send_completion_ack
  54.     {
  55.         run io_send_completion_ack;
  56.         default => release;
  57.     }
  58.  
  59.     state release
  60.     {
  61.         run io_release;
  62.         default => cleanup;
  63.     }
  64.  
  65.     state cleanup
  66.     {
  67.         run io_cleanup;
  68.         default => terminate;
  69.     }
  70. }
  71.  
  72. %%
  73.  
  74. /*
  75.  * Function: io_send_ack()
  76.  *
  77.  * Params:   server_op *s_op, 
  78.  *           job_status_s* js_p
  79.  *
  80.  * Pre:      error code has been set in job status for us to
  81.  *           report to client
  82.  *
  83.  * Post:     response has been sent to client
  84.  *            
  85.  * Returns:  int
  86.  *
  87.  * Synopsis: fills in a response to the I/O request, encodes it,
  88.  *           and sends it to the client via BMI.  Note that it may
  89.  *           send either positive or negative acknowledgements.
  90.  *           
  91.  */
  92. static int io_send_ack(
  93.         struct PINT_smcb *smcb, job_status_s *js_p)
  94. {
  95.     gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing io_send_ack (io.sm)....\n");
  96.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  97.     int err = -PVFS_EIO;
  98.     job_id_t tmp_id;
  99.     struct server_configuration_s *user_opts = get_server_config_struct();
  100.         
  101.     /* this is where we report the file size to the client before
  102.      * starting the I/O transfer, or else report an error if we
  103.      * failed to get the size, or failed for permission reasons
  104.      */
  105.     s_op->resp.status = js_p->error_code;
  106.     s_op->resp.u.io.bstream_size = s_op->ds_attr.u.datafile.b_size;
  107.  
  108.     gossip_debug(GOSSIP_MIRROR_DEBUG,"\tbstream_size:%d\n"
  109.                                     ,(int)s_op->resp.u.io.bstream_size);
  110.  
  111.     err = PINT_encode(&s_op->resp, PINT_ENCODE_RESP, &(s_op->encoded),
  112.                       s_op->addr, s_op->decoded.enc_type);
  113.     if (err < 0)
  114.     {
  115.         gossip_lerr("Server: IO SM: PINT_encode() failure.\n");
  116.         js_p->error_code = err;
  117.         return SM_ACTION_COMPLETE;
  118.     }
  119.  
  120.     err = job_bmi_send_list(
  121.         s_op->addr, s_op->encoded.buffer_list, s_op->encoded.size_list,
  122.         s_op->encoded.list_count, s_op->encoded.total_size,
  123.         s_op->tag, s_op->encoded.buffer_type, 0, smcb, 0, js_p,
  124.         &tmp_id, server_job_context, user_opts->server_job_bmi_timeout,
  125.         s_op->req->hints);
  126.  
  127.     return err;
  128. }
  129.  
  130. /*
  131.  * Function: io_start_flow()
  132.  *
  133.  * Params:   server_op *s_op, 
  134.  *           job_status_s* js_p
  135.  *
  136.  * Pre:      all of the previous steps have succeeded, so that we
  137.  *           are ready to actually perform the I/O
  138.  *
  139.  * Post:     I/O has been carried out
  140.  *            
  141.  * Returns:  int
  142.  *
  143.  * Synopsis: this is the most important part of the state machine.
  144.  *           we setup the flow descriptor and post it in order to 
  145.  *           carry out the data transfer
  146.  *           
  147.  */
  148. static PINT_sm_action io_start_flow(
  149.         struct PINT_smcb *smcb, job_status_s *js_p)
  150. {
  151.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  152.     int err = -PVFS_EIO;
  153.     job_id_t tmp_id;
  154.     struct server_configuration_s *user_opts = get_server_config_struct();
  155.     struct filesystem_configuration_s *fs_conf;
  156.         
  157.     s_op->u.io.flow_d = PINT_flow_alloc();
  158.     if (!s_op->u.io.flow_d)
  159.     {
  160.         js_p->error_code = -PVFS_ENOMEM;
  161.         return SM_ACTION_COMPLETE;
  162.     }
  163.  
  164.     s_op->u.io.flow_d->hints = s_op->req->hints;
  165.  
  166.     /* we still have the file size stored in the response structure 
  167.      * that we sent in the previous state, other details come from
  168.      * request
  169.      */
  170.     s_op->u.io.flow_d->file_data.fsize = s_op->resp.u.io.bstream_size;
  171.     s_op->u.io.flow_d->file_data.dist = s_op->req->u.io.io_dist;
  172.     s_op->u.io.flow_d->file_data.server_nr = s_op->req->u.io.server_nr;
  173.     s_op->u.io.flow_d->file_data.server_ct = s_op->req->u.io.server_ct;
  174.  
  175.     /* on writes, we allow the bstream to be extended at EOF */
  176.     if (s_op->req->u.io.io_type == PVFS_IO_WRITE)
  177.     {
  178.         gossip_debug(GOSSIP_IO_DEBUG, "io_start_flow() issuing flow to "
  179.                      "write data.\n");
  180.         s_op->u.io.flow_d->file_data.extend_flag = 1;
  181.     }
  182.     else
  183.     {
  184.         gossip_debug(GOSSIP_IO_DEBUG, "io_start_flow() issuing flow to "
  185.                      "read data.\n");
  186.         s_op->u.io.flow_d->file_data.extend_flag = 0;
  187.     }
  188.  
  189.     s_op->u.io.flow_d->file_req = s_op->req->u.io.file_req;
  190.     s_op->u.io.flow_d->file_req_offset = s_op->req->u.io.file_req_offset;
  191.     s_op->u.io.flow_d->mem_req = NULL;
  192.     s_op->u.io.flow_d->aggregate_size = s_op->req->u.io.aggregate_size;
  193.     s_op->u.io.flow_d->tag = s_op->tag;
  194.     s_op->u.io.flow_d->user_ptr = NULL;
  195.     s_op->u.io.flow_d->type = s_op->req->u.io.flow_type;
  196.  
  197.     fs_conf = PINT_config_find_fs_id(user_opts, 
  198.         s_op->req->u.io.fs_id);
  199.     if(fs_conf)
  200.     {
  201.         /* pick up any buffer settings overrides from fs conf */
  202.         s_op->u.io.flow_d->buffer_size = fs_conf->fp_buffer_size;
  203.         s_op->u.io.flow_d->buffers_per_flow = fs_conf->fp_buffers_per_flow;
  204.     }
  205.  
  206.     gossip_debug(GOSSIP_IO_DEBUG, "flow: fsize: %lld, " 
  207.         "server_nr: %d, server_ct: %d\n",
  208.         lld(s_op->u.io.flow_d->file_data.fsize),
  209.         (int)s_op->u.io.flow_d->file_data.server_nr,
  210.         (int)s_op->u.io.flow_d->file_data.server_ct);
  211.  
  212.     gossip_debug(GOSSIP_IO_DEBUG, "      file_req_offset: %lld, "
  213.         "aggregate_size: %lld, handle: %llu\n", 
  214.         lld(s_op->u.io.flow_d->file_req_offset),
  215.         lld(s_op->u.io.flow_d->aggregate_size),
  216.         llu(s_op->req->u.io.handle));
  217.  
  218.     /* set endpoints depending on type of io requested */
  219.     if (s_op->req->u.io.io_type == PVFS_IO_WRITE)
  220.     {
  221.         s_op->u.io.flow_d->src.endpoint_id = BMI_ENDPOINT;
  222.         s_op->u.io.flow_d->src.u.bmi.address = s_op->addr;
  223.         s_op->u.io.flow_d->dest.endpoint_id = TROVE_ENDPOINT;
  224.         s_op->u.io.flow_d->dest.u.trove.handle = s_op->req->u.io.handle;
  225.         s_op->u.io.flow_d->dest.u.trove.coll_id = s_op->req->u.io.fs_id;
  226.     }
  227.     else if (s_op->req->u.io.io_type == PVFS_IO_READ)
  228.     {
  229.         s_op->u.io.flow_d->src.endpoint_id = TROVE_ENDPOINT;
  230.         s_op->u.io.flow_d->src.u.trove.handle = s_op->req->u.io.handle;
  231.         s_op->u.io.flow_d->src.u.trove.coll_id = s_op->req->u.io.fs_id;
  232.         s_op->u.io.flow_d->dest.endpoint_id = BMI_ENDPOINT;
  233.         s_op->u.io.flow_d->dest.u.bmi.address = s_op->addr;
  234.     }
  235.     else
  236.     {
  237.         gossip_lerr("Server: IO SM: unknown IO type requested.\n");
  238.         js_p->error_code = -PVFS_EINVAL;
  239.         return SM_ACTION_COMPLETE;
  240.     }
  241.  
  242.     gossip_debug(GOSSIP_IO_DEBUG,"\tabout to issue job_flow...\n");
  243.     err = job_flow(s_op->u.io.flow_d, smcb, 0, js_p, &tmp_id,
  244.                    server_job_context, user_opts->server_job_flow_timeout
  245.                    , s_op->req->hints);
  246.  
  247.     gossip_debug(GOSSIP_IO_DEBUG,"\treturn code from job_flow "
  248.                                  "submission:%d\n"
  249.                                 ,err);
  250.  
  251.     return err;
  252. }
  253.  
  254. /*
  255.  * Function: io_release()
  256.  *
  257.  * Params:   server_op *b, 
  258.  *           job_status_s* js_p
  259.  *
  260.  * Pre:      we are done with all steps necessary to service
  261.  *           request
  262.  *
  263.  * Post:     operation has been released from the scheduler
  264.  *
  265.  * Returns:  int
  266.  *
  267.  * Synopsis: releases the operation from the scheduler
  268.  */
  269. static PINT_sm_action io_release(
  270.         struct PINT_smcb *smcb, job_status_s *js_p)
  271. {
  272.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  273.     int ret = 0;
  274.     job_id_t i;
  275.  
  276.     /*
  277.       tell the scheduler that we are done with this operation (if it
  278.       was scheduled in the first place)
  279.     */
  280.     ret = job_req_sched_release(
  281.         s_op->scheduled_id, smcb, 0, js_p, &i, server_job_context);
  282.     return ret;
  283. }
  284.  
  285. /*
  286.  * Function: io_cleanup()
  287.  *
  288.  * Params:   server_op *b, 
  289.  *           job_status_s* js_p
  290.  *
  291.  * Pre:      all jobs done, simply need to clean up
  292.  *
  293.  * Post:     everything is free
  294.  *
  295.  * Returns:  int
  296.  *
  297.  * Synopsis: free up any buffers associated with the operation,
  298.  *           including any encoded or decoded protocol structures
  299.  */
  300. static PINT_sm_action io_cleanup(
  301.         struct PINT_smcb *smcb, job_status_s *js_p)
  302. {
  303.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  304.     char status_string[64] = {0};
  305.  
  306.     PVFS_strerror_r(s_op->resp.status, status_string, 64);
  307.     PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "finish (%s)\n", status_string);
  308.  
  309.     if (s_op->u.io.flow_d)
  310.     {
  311.         PINT_flow_free(s_op->u.io.flow_d);
  312.     }
  313.  
  314.     /* let go of our encoded response buffer, if we appear to have
  315.      * made one
  316.      */
  317.     if (s_op->encoded.total_size)
  318.     {
  319.         PINT_encode_release(&s_op->encoded, PINT_ENCODE_RESP);
  320.     }
  321.  
  322.     return(server_state_machine_complete(smcb));
  323. }
  324.  
  325. /*
  326.  * Function: io_send_completion_ack()
  327.  *
  328.  * Params:   server_op *s_op, 
  329.  *           job_status_s* js_p
  330.  *
  331.  * Pre:      flow is completed so that we can report its status
  332.  *
  333.  * Post:     if this is a write, response has been sent to client
  334.  *           if this is a read, do nothing
  335.  *            
  336.  * Returns:  int
  337.  *
  338.  * Synopsis: fills in a response to the I/O request, encodes it,
  339.  *           and sends it to the client via BMI.  Note that it may
  340.  *           send either positive or negative acknowledgements.
  341.  *           
  342.  */
  343. static PINT_sm_action io_send_completion_ack(
  344.         struct PINT_smcb *smcb, job_status_s *js_p)
  345. {
  346.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  347.     int err = -PVFS_EIO;
  348.     job_id_t tmp_id;
  349.     struct server_configuration_s *user_opts = get_server_config_struct();
  350.     
  351.     gossip_debug(GOSSIP_IO_DEBUG,"Executing io_send_completion_ack.\n");
  352.     
  353.     /* we only send this trailing ack if we are working on a write
  354.      * operation; otherwise just cut out early
  355.      */
  356.     if (s_op->req->u.io.io_type == PVFS_IO_READ)
  357.     {
  358.         js_p->error_code = 0;
  359.         return SM_ACTION_COMPLETE;
  360.     }
  361.  
  362.     /* release encoding of the first ack that we sent */
  363.     PINT_encode_release(&s_op->encoded, PINT_ENCODE_RESP);
  364.  
  365.     /* zero size for safety */
  366.     s_op->encoded.total_size = 0;
  367.  
  368.     /*
  369.       fill in response -- status field is the only generic one we
  370.       should have to set
  371.     */
  372.     s_op->resp.op = PVFS_SERV_WRITE_COMPLETION;  /* not IO */
  373.     s_op->resp.status = js_p->error_code;
  374.     s_op->resp.u.write_completion.total_completed =
  375.         s_op->u.io.flow_d->total_transferred;
  376.  
  377.     err = PINT_encode(
  378.         &s_op->resp, PINT_ENCODE_RESP, &(s_op->encoded),
  379.         s_op->addr, s_op->decoded.enc_type);
  380.  
  381.     if (err < 0)
  382.     {
  383.         gossip_lerr("Server: IO SM: PINT_encode() failure.\n");
  384.         js_p->error_code = err;
  385.         return SM_ACTION_COMPLETE;
  386.     }
  387.  
  388.     gossip_debug(GOSSIP_IO_DEBUG,"\ts_op->tag:%d\n",s_op->tag);
  389.  
  390.     err = job_bmi_send_list(
  391.         s_op->addr, s_op->encoded.buffer_list, s_op->encoded.size_list,
  392.         s_op->encoded.list_count, s_op->encoded.total_size, s_op->tag,
  393.         s_op->encoded.buffer_type, 0, smcb, 0, js_p, &tmp_id,
  394.         server_job_context, user_opts->server_job_bmi_timeout,
  395.         s_op->req->hints);
  396.  
  397.     gossip_debug(GOSSIP_IO_DEBUG,"return code from sending ack:%d\n"
  398.                                 ,err);
  399.  
  400.     return err;
  401. }
  402.  
  403. static enum PINT_server_req_access_type PINT_server_req_access_io(
  404.     struct PVFS_server_req *req)
  405. {
  406.     if(req->u.io.io_type == PVFS_IO_READ)
  407.     {
  408.         return PINT_SERVER_REQ_READONLY;
  409.     }
  410.     return PINT_SERVER_REQ_MODIFY;
  411. }
  412.  
  413. PINT_GET_OBJECT_REF_DEFINE(io);
  414.  
  415. struct PINT_server_req_params pvfs2_io_params =
  416. {
  417.     .string_name = "io",
  418.     .perm = PINT_SERVER_CHECK_NONE,
  419.     .access_type = PINT_server_req_access_io,
  420.     .sched_policy = PINT_SERVER_REQ_SCHEDULE,
  421.     .get_object_ref = PINT_get_object_ref_io,
  422.     .state_machine = &pvfs2_io_sm
  423. };
  424.  
  425. /*
  426.  * Local variables:
  427.  *  mode: c
  428.  *  c-indent-level: 4
  429.  *  c-basic-offset: 4
  430.  * End:
  431.  *
  432.  * vim: ft=c ts=8 sts=4 sw=4 expandtab
  433.  */
  434.