home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
ftp.parl.clemson.edu
/
2015-02-07.ftp.parl.clemson.edu.tar
/
ftp.parl.clemson.edu
/
pub
/
pvfs2
/
orangefs-2.8.3-20110323.tar.gz
/
orangefs-2.8.3-20110323.tar
/
orangefs
/
src
/
server
/
unstuff.sm
< prev
Wrap
Text File
|
2010-09-20
|
13KB
|
501 lines
/*
* (C) 2001 Clemson University and The University of Chicago
*
* See COPYING in top-level directory.
*
* Changes by Acxiom Corporation to add dirent_count field to attributes
* Copyright © Acxiom Corporation, 2005.
*/
#include <string.h>
#include <assert.h>
#include "server-config.h"
#include "pvfs2-server.h"
#include "pvfs2-attr.h"
#include "pvfs2-types.h"
#include "pvfs2-types-debug.h"
#include "pvfs2-util.h"
#include "pint-util.h"
#include "pvfs2-internal.h"
#include "pint-cached-config.h"
#define STATE_UNSTUFF 33
%%
machine pvfs2_unstuff_sm
{
state prelude
{
jump pvfs2_prelude_sm;
success => getattr_setup;
default => final_response;
}
state getattr_setup
{
run getattr_setup;
success => getattr_do_work;
default => final_response;
}
state getattr_do_work
{
jump pvfs2_get_attr_work_sm;
default => getattr_interpret;
}
state getattr_interpret
{
run getattr_interpret;
STATE_UNSTUFF => get_keyvals;
default => final_response;
}
state get_keyvals
{
run get_keyvals;
success => inspect_keyvals;
default => final_response;
}
state inspect_keyvals
{
run inspect_keyvals;
success => get_handles;
default => final_response;
}
state get_handles
{
run get_handles;
default => set_handles_on_object;
}
state set_handles_on_object
{
run set_handles_on_object;
success => update_dfile_count;
default => final_response;
}
state update_dfile_count
{
run update_dfile_count;
success => remove_layout;
default => final_response;
}
state remove_layout
{
run remove_layout;
default => final_response;
}
state final_response
{
jump pvfs2_final_response_sm;
default => cleanup;
}
state cleanup
{
run cleanup;
default => terminate;
}
}
%%
static PINT_sm_action get_keyvals(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int kind = 0;
int ret;
job_id_t job_id;
s_op->keyval_count = 2;
s_op->key_a = malloc(sizeof(*s_op->key_a) * s_op->keyval_count);
if(!s_op->key_a)
{
js_p->error_code = -PVFS_ENOMEM;
goto error_exit;
}
s_op->val_a = malloc(sizeof(*s_op->val_a) * s_op->keyval_count);
if(!s_op->val_a)
{
js_p->error_code = -PVFS_ENOMEM;
goto free_key_array;
}
s_op->error_a = malloc(sizeof(*s_op->error_a) * s_op->keyval_count);
if(!s_op->error_a)
{
js_p->error_code = -PVFS_ENOMEM;
goto free_val_array;
}
memset(s_op->error_a, 0, sizeof(*s_op->error_a) * s_op->keyval_count);
s_op->u.unstuff.encoded_layout = malloc(PVFS_REQ_LIMIT_LAYOUT);
if(!s_op->u.unstuff.encoded_layout)
{
js_p->error_code = -PVFS_ENOMEM;
goto free_encoded_layout;
}
memset(s_op->u.unstuff.encoded_layout, 0, PVFS_REQ_LIMIT_LAYOUT);
/* kind = 0 */
s_op->key_a[kind].buffer = Trove_Common_Keys[METAFILE_LAYOUT_KEY].key;
s_op->key_a[kind].buffer_sz = Trove_Common_Keys[METAFILE_LAYOUT_KEY].size;
s_op->val_a[kind].buffer = s_op->u.unstuff.encoded_layout;
s_op->val_a[kind].buffer_sz = PVFS_REQ_LIMIT_LAYOUT;
++kind;
/* kind = 1 */
s_op->key_a[kind].buffer = Trove_Common_Keys[NUM_DFILES_REQ_KEY].key;
s_op->key_a[kind].buffer_sz = Trove_Common_Keys[NUM_DFILES_REQ_KEY].size;
s_op->val_a[kind].buffer = &s_op->u.unstuff.num_dfiles_req;
s_op->val_a[kind].buffer_sz = sizeof(s_op->u.unstuff.num_dfiles_req);
ret = job_trove_keyval_read_list(
s_op->req->u.unstuff.fs_id,
s_op->req->u.unstuff.handle,
s_op->key_a, s_op->val_a, s_op->error_a, s_op->keyval_count,
0, NULL, smcb, 0, js_p, &job_id, server_job_context,
s_op->req->hints);
return ret;
free_encoded_layout:
free(s_op->u.unstuff.encoded_layout);
s_op->u.unstuff.encoded_layout = NULL;
free_val_array:
free(s_op->val_a);
s_op->val_a = NULL;
free_key_array:
free(s_op->key_a);
s_op->key_a = NULL;
error_exit:
return SM_ACTION_COMPLETE;
}
static PINT_sm_action inspect_keyvals(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
char* tmpbuf;
if(js_p->error_code == 0)
{
/* check keys; we have a big problem if one of them is missing */
if(s_op->error_a[0])
{
js_p->error_code = s_op->error_a[0];
}
else if(s_op->error_a[1])
{
js_p->error_code = s_op->error_a[1];
}
if(js_p->error_code == 0)
{
/* sanity check num dfiles */
if(s_op->u.unstuff.num_dfiles_req < 1)
{
js_p->error_code = -PVFS_EINVAL;
}
/* decode layout information */
tmpbuf = s_op->u.unstuff.encoded_layout;
decode_PVFS_sys_layout(&tmpbuf, &s_op->u.unstuff.layout);
}
}
/* pass along error code for next state to handle */
return SM_ACTION_COMPLETE;
}
static PINT_sm_action get_handles(
struct PINT_smcb *smcb, job_status_s *js_p)
{
int ret;
job_id_t j_id;
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
if(s_op->u.unstuff.layout.algorithm != PVFS_SYS_LAYOUT_ROUND_ROBIN)
{
/* see create.sm; for now the only layout that we use stuffing on is
* ROUND_ROBIN. The storage format supports other layouts if we
* want to add support for others later
*/
gossip_err("Error: unstuff doesn't support layout algorithm: %d\n",
s_op->u.unstuff.layout.algorithm);
js_p->error_code = -PVFS_ENOSYS;
return SM_ACTION_COMPLETE;
}
/* allocate room for final number of handles we want */
s_op->u.unstuff.dfile_array =
malloc(s_op->u.unstuff.num_dfiles_req * sizeof(PVFS_handle));
if(!s_op->u.unstuff.dfile_array)
{
js_p->error_code = -PVFS_ENOMEM;
return SM_ACTION_COMPLETE;
}
/* the very first handle should be our current stuffed handle */
s_op->u.unstuff.dfile_array[0]
= s_op->resp.u.unstuff.attr.u.meta.dfile_array[0];
if(s_op->u.unstuff.num_dfiles_req == 1)
{
/* special case; we are unstuffing to 1 datafile. There is no need
* to retrieve any additional handles
*/
js_p->error_code = 0;
return SM_ACTION_COMPLETE;
}
/* NOTE: we are leaving the existing resp attr structure alone until we
* get a successful answer from get_handles() and commit to disk
*/
ret = job_precreate_pool_get_handles(
s_op->req->u.unstuff.fs_id,
(s_op->u.unstuff.num_dfiles_req-1),
PVFS_TYPE_DATAFILE,
NULL,
&s_op->u.unstuff.dfile_array[1],
0,
smcb,
0,
js_p,
&j_id,
server_job_context,
s_op->req->hints);
return ret;
}
static PINT_sm_action set_handles_on_object(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
job_id_t j_id;
gossip_debug(GOSSIP_SERVER_DEBUG, "job_precreate_pool_get_handles() returned %d\n", js_p->error_code);
if(js_p->error_code < 0)
{
/* we failed to retrieve any handles */
if(s_op->u.unstuff.dfile_array)
{
free(s_op->u.unstuff.dfile_array);
/* preserve error code */
return SM_ACTION_COMPLETE;
}
}
/* replace dfile information in attr structure */
free(s_op->resp.u.unstuff.attr.u.meta.dfile_array);
s_op->resp.u.unstuff.attr.u.meta.dfile_array
= s_op->u.unstuff.dfile_array;
s_op->resp.u.unstuff.attr.u.meta.dfile_count
= s_op->u.unstuff.num_dfiles_req;
/* write new datafile handles to disk */
s_op->key.buffer = Trove_Common_Keys[METAFILE_HANDLES_KEY].key;
s_op->key.buffer_sz = Trove_Common_Keys[METAFILE_HANDLES_KEY].size;
s_op->val.buffer =
s_op->resp.u.unstuff.attr.u.meta.dfile_array;
s_op->val.buffer_sz =
s_op->resp.u.unstuff.attr.u.meta.dfile_count * sizeof(PVFS_handle);
return job_trove_keyval_write(
s_op->req->u.unstuff.fs_id,
s_op->req->u.unstuff.handle,
&s_op->key,
&s_op->val,
0, NULL, smcb, 0, js_p, &j_id, server_job_context,
s_op->req->hints);
}
static PINT_sm_action update_dfile_count(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
job_id_t j_id;
/* take the current on-disk attribute in object_attr form
* (acquired from prelude) with the modified datafile array,
* and convert it to the dspace form for writing.
*/
PVFS_object_attr_to_ds_attr(&s_op->resp.u.unstuff.attr, &s_op->ds_attr);
return job_trove_dspace_setattr(
s_op->req->u.unstuff.fs_id, s_op->req->u.unstuff.handle,
&s_op->ds_attr,
TROVE_SYNC,
smcb, 0, js_p, &j_id, server_job_context,
s_op->req->hints);
}
static PINT_sm_action remove_layout(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
job_id_t j_id;
/* remove the layout and num_dfiles_req keyvals as the layout has
* now been chosen.
*/
return job_trove_keyval_remove_list(
s_op->req->u.unstuff.fs_id, s_op->req->u.unstuff.handle,
s_op->key_a,
s_op->val_a,
s_op->error_a,
2,
TROVE_SYNC, NULL,
smcb, 0, js_p, &j_id, server_job_context,
s_op->req->hints);
}
static PINT_sm_action cleanup(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
if(s_op->u.unstuff.layout.server_list.servers)
{
free(s_op->u.unstuff.layout.server_list.servers);
}
if(s_op->u.unstuff.encoded_layout)
{
free(s_op->u.unstuff.encoded_layout);
}
if(s_op->val_a)
{
free(s_op->val_a);
}
if(s_op->key_a)
{
free(s_op->key_a);
}
if(s_op->error_a)
{
free(s_op->error_a);
}
PINT_free_object_attr(&s_op->resp.u.getattr.attr);
return (server_state_machine_complete(smcb));
}
static PINT_sm_action getattr_setup(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
struct PINT_server_op *getattr_op;
int ret;
js_p->error_code = 0;
getattr_op = malloc(sizeof(*getattr_op));
if(!getattr_op)
{
js_p->error_code = -PVFS_ENOMEM;
return SM_ACTION_COMPLETE;
}
memset(getattr_op, 0, sizeof(*getattr_op));
/* TODO: can we come up with a way to clean up and nail down what has
* to be set in order to run this nested machine? This seems fragile.
*/
/* need attrs that the prelude read already */
getattr_op->attr = s_op->attr;
/* need a valid request structure for some generic features like access
* logging
*/
getattr_op->req = s_op->req;
/* need to fill in the input parameters to the getattr nested machine */
getattr_op->u.getattr.fs_id = s_op->req->u.unstuff.fs_id;
getattr_op->u.getattr.handle = s_op->req->u.unstuff.handle;
getattr_op->u.getattr.attrmask = s_op->req->u.unstuff.attrmask;
ret = PINT_sm_push_frame(smcb, 0, getattr_op);
if(ret < 0)
{
js_p->error_code = ret;
}
return SM_ACTION_COMPLETE;
}
static PINT_sm_action getattr_interpret(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_server_op *getattr_op;
struct PINT_server_op *s_op;
int task_id;
int remaining;
getattr_op = PINT_sm_pop_frame(smcb, &task_id, &js_p->error_code,
&remaining);
s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
s_op->resp.u.unstuff.attr = getattr_op->resp.u.getattr.attr;
free(getattr_op);
if(js_p->error_code)
{
gossip_debug(GOSSIP_SERVER_DEBUG,
"unstuff failed to retrieve existing attrs.\n");
return(SM_ACTION_COMPLETE);
}
if(s_op->resp.u.unstuff.attr.mask & PVFS_ATTR_META_UNSTUFFED)
{
gossip_debug(GOSSIP_SERVER_DEBUG,
"unstuff found file already unstuffed; return existing attrs.\n");
js_p->error_code = 0;
return(SM_ACTION_COMPLETE);
}
/*
gossip_err("Attributes show file as stuffed.\n");
*/
gossip_debug(GOSSIP_SERVER_DEBUG,
"unstuff found stuffed file.\n");
js_p->error_code = STATE_UNSTUFF;
return SM_ACTION_COMPLETE;
}
PINT_GET_OBJECT_REF_DEFINE(unstuff);
struct PINT_server_req_params pvfs2_unstuff_params =
{
.string_name = "unstuff",
.perm = PINT_SERVER_CHECK_ATTR,
.access_type = PINT_server_req_modify,
.sched_policy = PINT_SERVER_REQ_SCHEDULE,
.get_object_ref = PINT_get_object_ref_unstuff,
.state_machine = &pvfs2_unstuff_sm
};
/*
* Local variables:
* mode: c
* c-indent-level: 4
* c-basic-offset: 4
* End:
*
* vim: ft=c ts=8 sts=4 sw=4 expandtab
*/