"SfR Fresh" - the SfR Freeware/Shareware Archive

Member "pvfs-2.7.1/src/client/sysint/sys-io.c" of archive pvfs-2.7.1.tar.gz:


As a special service "SfR Fresh" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. That can be also achieved for any archive member file by clicking within an archive contents listing on the first character of the file(path) respectively on the according byte size field.
    1 /* WARNING: THIS FILE IS AUTOMATICALLY GENERATED FROM A .SM FILE.
    2  * Changes made here will certainly be overwritten.
    3  */
    4 
    5 /*
    6  * (C) 2003 Clemson University and The University of Chicago
    7  *
    8  * See COPYING in top-level directory.
    9  */
   10 
   11 /** \file
   12  *  \ingroup sysint
   13  *
   14  *  PVFS2 system interface routines for reading and writing files.
   15  */
   16 
   17 #include <string.h>
   18 #include <assert.h>
   19 
   20 #include "client-state-machine.h"
   21 #include "pvfs2-debug.h"
   22 #include "job.h"
   23 #include "gossip.h"
   24 #include "str-utils.h"
   25 #include "pint-cached-config.h"
   26 #include "PINT-reqproto-encode.h"
   27 #include "pint-util.h"
   28 #include "pvfs2-internal.h"
   29 
   30 #define IO_MAX_SEGMENT_NUM 50
   31 
   32 extern job_context_id pint_client_sm_context;
   33 
   34 enum
   35 {
   36     IO_NO_DATA = 132,
   37     IO_DATAFILE_TRANSFERS_COMPLETE,
   38     IO_RETRY,
   39     IO_RETRY_NODELAY,
   40     IO_GET_DATAFILE_SIZE,
   41     IO_ANALYZE_SIZE_RESULTS,
   42     IO_DO_SMALL_IO
   43 };
   44 
   45 /* Helper functions local to sys-io.sm. */
   46 
   47 static inline int io_complete_context_send_or_recv(
   48     PINT_smcb *smcb, job_status_s *js_p);
   49 
   50 static inline int io_decode_ack_response(
   51     PINT_client_io_ctx *cur_ctx,
   52     struct PINT_decoded_msg *decoded_resp,
   53     struct PVFS_server_resp **resp);
   54 
   55 static inline int io_post_flow(
   56     PINT_smcb *smcb, PINT_client_io_ctx *cur_ctx);
   57 
   58 static inline int io_post_write_ack_recv(
   59     PINT_smcb *smcb, PINT_client_io_ctx * cur_ctx);
   60 
   61 static inline int io_process_context_recv(
   62     PINT_client_sm *sm_p, job_status_s *js_p, PINT_client_io_ctx **out_ctx);
   63 
   64 static inline int io_check_context_status(
   65     PINT_client_io_ctx *cur_ctx, int io_type,
   66     PVFS_size *total_size);
   67 
   68 static int io_find_target_datafiles(
   69     PVFS_Request mem_req,
   70     PVFS_Request file_req,
   71     PVFS_offset file_req_offset,
   72     PINT_dist *dist_p,
   73     PVFS_fs_id fs_id,
   74     enum PVFS_io_type io_type,
   75     PVFS_handle *input_handle_array,
   76     int input_handle_count,
   77     int *handle_index_array,
   78     int *handle_index_out_count,
   79     int *sio_handle_index_array,
   80     int *sio_handle_index_count);
   81 
   82 static int io_find_total_size(
   83     PINT_client_sm * sm_p,
   84     PVFS_offset final_offset,
   85     PVFS_size * total_return_size);
   86 
   87 static int io_find_offset(
   88     PINT_client_sm * sm_p,
   89     PVFS_size contig_size,
   90     PVFS_size * total_return_offset);
   91 
   92 static int io_get_max_unexp_size(
   93     struct PINT_Request * file_req,
   94     PVFS_handle handle,
   95     PVFS_fs_id fs_id,
   96     enum PVFS_io_type type,
   97     int * max_unexp_payload);
   98 
   99 static int io_zero_fill_holes(
  100     PINT_client_sm *sm_p,
  101     PVFS_size eof,
  102     int datafile_count,
  103     PVFS_size * datafile_size_array,
  104     int * datafile_index_array);
  105 
  106 static int io_contexts_init(PINT_client_sm *sm_p, int count,
  107                             PVFS_object_attr *attr);
  108 
  109 static void io_contexts_destroy(PINT_client_sm *sm_p);
  110 
  111 /* misc constants and helper macros */
  112 #define IO_RECV_COMPLETED                                    1
  113 
  114 /* possible I/O state machine phases (status_user_tag) */
  115 #define IO_SM_PHASE_REQ_MSGPAIR_RECV                         0
  116 #define IO_SM_PHASE_REQ_MSGPAIR_SEND                         1
  117 #define IO_SM_PHASE_FLOW                                     2
  118 #define IO_SM_PHASE_FINAL_ACK                                3
  119 #define IO_SM_NUM_PHASES                                     4
  120 
  121 #define STATUS_USER_TAG_TYPE(tag, type)                      \
  122 ((tag % IO_SM_NUM_PHASES) == type)
  123 #define STATUS_USER_TAG_GET_INDEX(tag, type)                 \
  124 (tag / IO_SM_NUM_PHASES)
  125 #define STATUS_USER_TAG_IS_SEND_OR_RECV(tag)                 \
  126 (STATUS_USER_TAG_TYPE(tag, IO_SM_PHASE_REQ_MSGPAIR_RECV) ||  \
  127  STATUS_USER_TAG_TYPE(tag, IO_SM_PHASE_REQ_MSGPAIR_SEND))
  128 
  129 static int io_datafile_index_array_init(
  130     PINT_client_sm *sm_p,
  131     int datafile_count);
  132 
  133 static void io_datafile_index_array_destroy(
  134     PINT_client_sm *sm_p);
  135 
  136 
  137 static PINT_sm_action io_init(
  138 	struct PINT_smcb *smcb, job_status_s *js_p);
  139 
  140 static struct PINT_state_s ST_init;
  141 static struct PINT_pjmp_tbl_s ST_init_pjtbl[];
  142 static struct PINT_tran_tbl_s ST_init_trtbl[];
  143 static struct PINT_state_s ST_io_getattr;
  144 static struct PINT_pjmp_tbl_s ST_io_getattr_pjtbl[];
  145 static struct PINT_tran_tbl_s ST_io_getattr_trtbl[];
  146 
  147 static PINT_sm_action io_datafile_setup_msgpairs(
  148 	struct PINT_smcb *smcb, job_status_s *js_p);
  149 
  150 static struct PINT_state_s ST_io_datafile_setup_msgpairs;
  151 static struct PINT_pjmp_tbl_s ST_io_datafile_setup_msgpairs_pjtbl[];
  152 static struct PINT_tran_tbl_s ST_io_datafile_setup_msgpairs_trtbl[];
  153 static struct PINT_state_s ST_small_io;
  154 static struct PINT_pjmp_tbl_s ST_small_io_pjtbl[];
  155 static struct PINT_tran_tbl_s ST_small_io_trtbl[];
  156 
  157 static PINT_sm_action io_datafile_post_msgpairs(
  158 	struct PINT_smcb *smcb, job_status_s *js_p);
  159 
  160 static struct PINT_state_s ST_io_datafile_post_msgpairs;
  161 static struct PINT_pjmp_tbl_s ST_io_datafile_post_msgpairs_pjtbl[];
  162 static struct PINT_tran_tbl_s ST_io_datafile_post_msgpairs_trtbl[];
  163 
  164 static PINT_sm_action io_datafile_post_msgpairs_retry(
  165 	struct PINT_smcb *smcb, job_status_s *js_p);
  166 
  167 static struct PINT_state_s ST_io_datafile_post_msgpairs_retry;
  168 static struct PINT_pjmp_tbl_s ST_io_datafile_post_msgpairs_retry_pjtbl[];
  169 static struct PINT_tran_tbl_s ST_io_datafile_post_msgpairs_retry_trtbl[];
  170 
  171 static PINT_sm_action io_datafile_complete_operations(
  172 	struct PINT_smcb *smcb, job_status_s *js_p);
  173 
  174 static struct PINT_state_s ST_io_datafile_complete_operations;
  175 static struct PINT_pjmp_tbl_s ST_io_datafile_complete_operations_pjtbl[];
  176 static struct PINT_tran_tbl_s ST_io_datafile_complete_operations_trtbl[];
  177 
  178 static PINT_sm_action io_analyze_results(
  179 	struct PINT_smcb *smcb, job_status_s *js_p);
  180 
  181 static struct PINT_state_s ST_io_analyze_results;
  182 static struct PINT_pjmp_tbl_s ST_io_analyze_results_pjtbl[];
  183 static struct PINT_tran_tbl_s ST_io_analyze_results_trtbl[];
  184 static struct PINT_state_s ST_io_datafile_size;
  185 static struct PINT_pjmp_tbl_s ST_io_datafile_size_pjtbl[];
  186 static struct PINT_tran_tbl_s ST_io_datafile_size_trtbl[];
  187 
  188 static PINT_sm_action io_analyze_size_results(
  189 	struct PINT_smcb *smcb, job_status_s *js_p);
  190 
  191 static struct PINT_state_s ST_io_analyze_size_results;
  192 static struct PINT_pjmp_tbl_s ST_io_analyze_size_results_pjtbl[];
  193 static struct PINT_tran_tbl_s ST_io_analyze_size_results_trtbl[];
  194 
  195 static PINT_sm_action io_cleanup(
  196 	struct PINT_smcb *smcb, job_status_s *js_p);
  197 
  198 static struct PINT_state_s ST_io_cleanup;
  199 static struct PINT_pjmp_tbl_s ST_io_cleanup_pjtbl[];
  200 static struct PINT_tran_tbl_s ST_io_cleanup_trtbl[];
  201 
  202 struct PINT_state_machine_s pvfs2_client_io_sm = {
  203 	.name = "pvfs2_client_io_sm",
  204 	.first_state = &ST_init
  205 };
  206 
  207 static struct PINT_state_s ST_init = {
  208 	 .state_name = "init" ,
  209 	 .parent_machine = &pvfs2_client_io_sm ,
  210 	 .flag = SM_RUN ,
  211 	 .action.func = io_init ,
  212 	 .pjtbl = NULL ,
  213 	 .trtbl = ST_init_trtbl
  214 };
  215 
  216 static struct PINT_tran_tbl_s ST_init_trtbl[] = {
  217 	{ .return_value = -1 ,
  218 	 .next_state = &ST_io_getattr }
  219 };
  220 
  221 static struct PINT_state_s ST_io_getattr = {
  222 	 .state_name = "io_getattr" ,
  223 	 .parent_machine = &pvfs2_client_io_sm ,
  224 	 .flag = SM_JUMP ,
  225 	 .action.nested = &pvfs2_client_getattr_sm ,
  226 	 .pjtbl = NULL ,
  227 	 .trtbl = ST_io_getattr_trtbl
  228 };
  229 
  230 static struct PINT_tran_tbl_s ST_io_getattr_trtbl[] = {
  231 	{ .return_value = 0 ,
  232 	 .next_state = &ST_io_datafile_setup_msgpairs },
  233 	{ .return_value = -1 ,
  234 	 .next_state = &ST_io_cleanup }
  235 };
  236 
  237 static struct PINT_state_s ST_io_datafile_setup_msgpairs = {
  238 	 .state_name = "io_datafile_setup_msgpairs" ,
  239 	 .parent_machine = &pvfs2_client_io_sm ,
  240 	 .flag = SM_RUN ,
  241 	 .action.func = io_datafile_setup_msgpairs ,
  242 	 .pjtbl = NULL ,
  243 	 .trtbl = ST_io_datafile_setup_msgpairs_trtbl
  244 };
  245 
  246 static struct PINT_tran_tbl_s ST_io_datafile_setup_msgpairs_trtbl[] = {
  247 	{ .return_value = IO_NO_DATA ,
  248 	 .next_state = &ST_io_cleanup },
  249 	{ .return_value = IO_DO_SMALL_IO ,
  250 	 .next_state = &ST_small_io },
  251 	{ .return_value = 0 ,
  252 	 .next_state = &ST_io_datafile_post_msgpairs },
  253 	{ .return_value = -1 ,
  254 	 .next_state = &ST_io_cleanup }
  255 };
  256 
  257 static struct PINT_state_s ST_small_io = {
  258 	 .state_name = "small_io" ,
  259 	 .parent_machine = &pvfs2_client_io_sm ,
  260 	 .flag = SM_JUMP ,
  261 	 .action.nested = &pvfs2_client_small_io_sm ,
  262 	 .pjtbl = NULL ,
  263 	 .trtbl = ST_small_io_trtbl
  264 };
  265 
  266 static struct PINT_tran_tbl_s ST_small_io_trtbl[] = {
  267 	{ .return_value = 0 ,
  268 	 .next_state = &ST_io_analyze_results },
  269 	{ .return_value = -1 ,
  270 	 .next_state = &ST_io_cleanup }
  271 };
  272 
  273 static struct PINT_state_s ST_io_datafile_post_msgpairs = {
  274 	 .state_name = "io_datafile_post_msgpairs" ,
  275 	 .parent_machine = &pvfs2_client_io_sm ,
  276 	 .flag = SM_RUN ,
  277 	 .action.func = io_datafile_post_msgpairs ,
  278 	 .pjtbl = NULL ,
  279 	 .trtbl = ST_io_datafile_post_msgpairs_trtbl
  280 };
  281 
  282 static struct PINT_tran_tbl_s ST_io_datafile_post_msgpairs_trtbl[] = {
  283 	{ .return_value = IO_RETRY ,
  284 	 .next_state = &ST_io_datafile_post_msgpairs_retry },
  285 	{ .return_value = -1 ,
  286 	 .next_state = &ST_io_datafile_complete_operations }
  287 };
  288 
  289 static struct PINT_state_s ST_io_datafile_post_msgpairs_retry = {
  290 	 .state_name = "io_datafile_post_msgpairs_retry" ,
  291 	 .parent_machine = &pvfs2_client_io_sm ,
  292 	 .flag = SM_RUN ,
  293 	 .action.func = io_datafile_post_msgpairs_retry ,
  294 	 .pjtbl = NULL ,
  295 	 .trtbl = ST_io_datafile_post_msgpairs_retry_trtbl
  296 };
  297 
  298 static struct PINT_tran_tbl_s ST_io_datafile_post_msgpairs_retry_trtbl[] = {
  299 	{ .return_value = IO_DATAFILE_TRANSFERS_COMPLETE ,
  300 	 .next_state = &ST_io_analyze_results },
  301 	{ .return_value = -1 ,
  302 	 .next_state = &ST_io_datafile_post_msgpairs }
  303 };
  304 
  305 static struct PINT_state_s ST_io_datafile_complete_operations = {
  306 	 .state_name = "io_datafile_complete_operations" ,
  307 	 .parent_machine = &pvfs2_client_io_sm ,
  308 	 .flag = SM_RUN ,
  309 	 .action.func = io_datafile_complete_operations ,
  310 	 .pjtbl = NULL ,
  311 	 .trtbl = ST_io_datafile_complete_operations_trtbl
  312 };
  313 
  314 static struct PINT_tran_tbl_s ST_io_datafile_complete_operations_trtbl[] = {
  315 	{ .return_value = IO_DATAFILE_TRANSFERS_COMPLETE ,
  316 	 .next_state = &ST_io_analyze_results },
  317 	{ .return_value = IO_RETRY ,
  318 	 .next_state = &ST_io_datafile_post_msgpairs_retry },
  319 	{ .return_value = -1 ,
  320 	 .next_state = &ST_io_datafile_complete_operations }
  321 };
  322 
  323 static struct PINT_state_s ST_io_analyze_results = {
  324 	 .state_name = "io_analyze_results" ,
  325 	 .parent_machine = &pvfs2_client_io_sm ,
  326 	 .flag = SM_RUN ,
  327 	 .action.func = io_analyze_results ,
  328 	 .pjtbl = NULL ,
  329 	 .trtbl = ST_io_analyze_results_trtbl
  330 };
  331 
  332 static struct PINT_tran_tbl_s ST_io_analyze_results_trtbl[] = {
  333 	{ .return_value = IO_RETRY ,
  334 	 .next_state = &ST_init },
  335 	{ .return_value = IO_ANALYZE_SIZE_RESULTS ,
  336 	 .next_state = &ST_io_analyze_size_results },
  337 	{ .return_value = IO_GET_DATAFILE_SIZE ,
  338 	 .next_state = &ST_io_datafile_size },
  339 	{ .return_value = -1 ,
  340 	 .next_state = &ST_io_cleanup }
  341 };
  342 
  343 static struct PINT_state_s ST_io_datafile_size = {
  344 	 .state_name = "io_datafile_size" ,
  345 	 .parent_machine = &pvfs2_client_io_sm ,
  346 	 .flag = SM_JUMP ,
  347 	 .action.nested = &pvfs2_client_datafile_getattr_sizes_sm ,
  348 	 .pjtbl = NULL ,
  349 	 .trtbl = ST_io_datafile_size_trtbl
  350 };
  351 
  352 static struct PINT_tran_tbl_s ST_io_datafile_size_trtbl[] = {
  353 	{ .return_value = 0 ,
  354 	 .next_state = &ST_io_analyze_size_results },
  355 	{ .return_value = -1 ,
  356 	 .next_state = &ST_io_cleanup }
  357 };
  358 
  359 static struct PINT_state_s ST_io_analyze_size_results = {
  360 	 .state_name = "io_analyze_size_results" ,
  361 	 .parent_machine = &pvfs2_client_io_sm ,
  362 	 .flag = SM_RUN ,
  363 	 .action.func = io_analyze_size_results ,
  364 	 .pjtbl = NULL ,
  365 	 .trtbl = ST_io_analyze_size_results_trtbl
  366 };
  367 
  368 static struct PINT_tran_tbl_s ST_io_analyze_size_results_trtbl[] = {
  369 	{ .return_value = -1 ,
  370 	 .next_state = &ST_io_cleanup }
  371 };
  372 
  373 static struct PINT_state_s ST_io_cleanup = {
  374 	 .state_name = "io_cleanup" ,
  375 	 .parent_machine = &pvfs2_client_io_sm ,
  376 	 .flag = SM_RUN ,
  377 	 .action.func = io_cleanup ,
  378 	 .pjtbl = NULL ,
  379 	 .trtbl = ST_io_cleanup_trtbl
  380 };
  381 
  382 static struct PINT_tran_tbl_s ST_io_cleanup_trtbl[] = {
  383 	{ .return_value = -1 ,
  384 
  385 	 .flag = SM_TERM }
  386 };
  387 
  388 # 216 "src/client/sysint/sys-io.sm"
  389 
  390 
  391 /** Initiate a read or write operation.
  392  *
  393  *  \param type specifies if the operation is a read or write.
  394  */
  395 PVFS_error PVFS_isys_io(
  396     PVFS_object_ref ref,
  397     PVFS_Request file_req,
  398     PVFS_offset file_req_offset,
  399     void *buffer,
  400     PVFS_Request mem_req,
  401     const PVFS_credentials *credentials,
  402     PVFS_sysresp_io *resp_p,
  403     enum PVFS_io_type io_type,
  404     PVFS_sys_op_id *op_id,
  405     void *user_ptr)
  406 {
  407     PVFS_error ret = -PVFS_EINVAL;
  408     PINT_smcb *smcb = NULL;
  409     PINT_client_sm *sm_p = NULL;
  410     struct filesystem_configuration_s* cur_fs = NULL;
  411     struct server_configuration_s *server_config = NULL;
  412 
  413     gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_isys_io entered [%llu]\n",
  414                  llu(ref.handle));
  415 
  416     if ((ref.handle == PVFS_HANDLE_NULL) ||
  417         (ref.fs_id == PVFS_FS_ID_NULL) || (resp_p == NULL))
  418     {
  419         gossip_err("invalid (NULL) required argument\n");
  420         return ret;
  421     }
  422 
  423     if ((io_type != PVFS_IO_READ) && (io_type != PVFS_IO_WRITE))
  424     {
  425         gossip_err("invalid (unknown) I/O type specified\n");
  426         return ret;
  427     }
  428 
  429     server_config = PINT_get_server_config_struct(ref.fs_id);
  430     cur_fs = PINT_config_find_fs_id(server_config, ref.fs_id);
  431     PINT_put_server_config_struct(server_config);
  432 
  433     if (!cur_fs)
  434     {
  435         gossip_err("invalid (unknown) fs id specified\n");
  436         return ret;
  437     }
  438 
  439     /* look for zero byte operations */
  440     if ((PINT_REQUEST_TOTAL_BYTES(mem_req) == 0) ||
  441         (PINT_REQUEST_TOTAL_BYTES(file_req) == 0))
  442     {
  443         gossip_ldebug(GOSSIP_IO_DEBUG, "Warning: 0 byte I/O operation "
  444                       "attempted.\n");
  445         resp_p->total_completed = 0;
  446         return 1;
  447     }
  448 
  449     PINT_smcb_alloc(&smcb, PVFS_SYS_IO,
  450              sizeof(struct PINT_client_sm),
  451              client_op_state_get_machine,
  452              client_state_machine_terminate,
  453              pint_client_sm_context);
  454     if (smcb == NULL)
  455     {
  456         return -PVFS_ENOMEM;
  457     }
  458     sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  459 
  460     PINT_init_msgarray_params(&sm_p->msgarray_params, ref.fs_id);
  461     PINT_init_sysint_credentials(sm_p->cred_p, credentials);
  462     sm_p->u.io.io_type = io_type;
  463     sm_p->u.io.file_req = file_req;
  464     sm_p->u.io.file_req_offset = file_req_offset;
  465     sm_p->u.io.io_resp_p = resp_p;
  466     sm_p->u.io.mem_req = mem_req;
  467     sm_p->u.io.buffer = buffer;
  468     sm_p->u.io.flowproto_type = cur_fs->flowproto;
  469     sm_p->u.io.encoding = cur_fs->encoding;
  470     sm_p->u.io.stored_error_code = 0;
  471     sm_p->u.io.retry_count = 0;
  472     sm_p->msgarray = NULL;
  473     sm_p->u.io.datafile_index_array = NULL;
  474     sm_p->u.io.datafile_count = 0;
  475     sm_p->u.io.total_size = 0;
  476     sm_p->u.io.small_io = 0;
  477     sm_p->object_ref = ref;
  478 
  479     return PINT_client_state_machine_post(
  480         smcb,  op_id, user_ptr);
  481 }
  482 
  483 /** Perform a read or write operation.
  484  *
  485  *  \param type specifies if the operation is a read or write.
  486  */
  487 PVFS_error PVFS_sys_io(
  488     PVFS_object_ref ref,
  489     PVFS_Request file_req,
  490     PVFS_offset file_req_offset,
  491     void *buffer,
  492     PVFS_Request mem_req,
  493     const PVFS_credentials *credentials,
  494     PVFS_sysresp_io *resp_p,
  495     enum PVFS_io_type io_type)
  496 {
  497     PVFS_error ret = -PVFS_EINVAL, error = 0;
  498     PVFS_sys_op_id op_id;
  499 
  500     gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_sys_io entered\n");
  501 
  502     ret = PVFS_isys_io(ref, file_req, file_req_offset, buffer, mem_req,
  503                        credentials, resp_p, io_type, &op_id, NULL);
  504     if (ret == 1)
  505         return 0;
  506     else if (ret < 0)
  507     {
  508         PVFS_perror_gossip("PVFS_isys_io call", ret);
  509         error = ret;
  510     }
  511     else
  512     {
  513         ret = PVFS_sys_wait(op_id, "io", &error);
  514         if (ret)
  515         {
  516             PVFS_perror_gossip("PVFS_sys_wait call", ret);
  517             error = ret;
  518         }
  519         PINT_sys_release(op_id);
  520     }
  521 
  522     return error;
  523 }
  524 
  525 /*******************************************************************/
  526 
  527 static PINT_sm_action io_init(
  528         struct PINT_smcb *smcb, job_status_s *js_p)
  529 {
  530     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  531     job_id_t tmp_id;
  532 
  533     gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) io state: io_init\n", sm_p);
  534 
  535     assert((js_p->error_code == 0) ||
  536            (js_p->error_code == IO_RETRY));
  537 
  538     PINT_SM_GETATTR_STATE_FILL(
  539         sm_p->getattr,
  540         sm_p->object_ref,
  541         PVFS_ATTR_META_ALL|PVFS_ATTR_COMMON_TYPE,
  542         PVFS_TYPE_METAFILE,
  543         0);
  544 
  545     if (js_p->error_code == IO_RETRY ||
  546         (js_p->error_code == IO_RETRY_NODELAY))
  547     {
  548         js_p->error_code = 0;
  549 
  550         io_datafile_index_array_destroy(sm_p);
  551         io_contexts_destroy(sm_p);
  552 
  553         if (PINT_smcb_cancelled(smcb))
  554         {
  555             js_p->error_code = -PVFS_ECANCEL;
  556             return SM_ACTION_COMPLETE;
  557         }
  558 
  559         if(js_p->error_code == IO_RETRY_NODELAY)
  560         {
  561             gossip_debug(GOSSIP_IO_DEBUG, "  sys-io retrying without delay.\n");
  562             js_p->error_code = 0;
  563             return 1;
  564         }
  565         gossip_debug(GOSSIP_IO_DEBUG, "  sys-io retrying with delay.\n");
  566         return job_req_sched_post_timer(
  567             sm_p->msgarray_params.retry_delay, smcb, 0, js_p, &tmp_id,
  568             pint_client_sm_context);
  569     }
  570     return SM_ACTION_COMPLETE;
  571 }
  572 
  573 static PINT_sm_action io_datafile_setup_msgpairs(
  574         struct PINT_smcb *smcb, job_status_s *js_p)
  575 {
  576     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  577     int ret = -PVFS_EINVAL, i = 0;
  578     PVFS_object_attr *attr = NULL;
  579     int target_datafile_count = 0;
  580     int * sio_array;
  581     int sio_count;
  582 
  583     gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) io state: "
  584                  "io_datafile_setup_msgpairs\n", sm_p);
  585 
  586     if (PINT_smcb_cancelled(smcb))
  587     {
  588         js_p->error_code = -PVFS_ECANCEL;
  589         goto exit;
  590     }
  591 
  592     js_p->error_code = 0;
  593 
  594     attr = &sm_p->getattr.attr;
  595     assert(attr);
  596 
  597     switch(attr->objtype)
  598     {
  599         case PVFS_TYPE_METAFILE:
  600             assert(attr->mask & PVFS_ATTR_META_DFILES);
  601             assert(attr->mask & PVFS_ATTR_META_DIST);
  602             assert(attr->u.meta.dist_size > 0);
  603             assert(attr->u.meta.dfile_array);
  604             assert(attr->u.meta.dfile_count > 0);
  605             break;
  606         case PVFS_TYPE_DIRECTORY:
  607             js_p->error_code = -PVFS_EISDIR;
  608             goto exit;
  609         default:
  610             js_p->error_code = -PVFS_EBADF;
  611             goto exit;
  612     }
  613     /* cannot write to an immutable file */
  614     if (sm_p->u.io.io_type == PVFS_IO_WRITE
  615         && (attr->u.meta.hint.flags & PVFS_IMMUTABLE_FL))
  616     {
  617         js_p->error_code = -PVFS_EPERM;
  618         goto exit;
  619     }
  620     ret = PINT_dist_lookup(attr->u.meta.dist);
  621     if (ret)
  622     {
  623         PVFS_perror_gossip("PINT_dist_lookup failed; aborting I/O", ret);
  624         js_p->error_code = -PVFS_EBADF;
  625         goto exit;
  626     }
  627 
  628     ret = io_datafile_index_array_init(sm_p, attr->u.meta.dfile_count);
  629     if(ret < 0)
  630     {
  631         js_p->error_code = ret;
  632         goto error_exit;
  633     }
  634 
  635     PINT_SM_DATAFILE_SIZE_ARRAY_INIT(
  636         &sm_p->u.io.dfile_size_array,
  637         attr->u.meta.dfile_count);
  638 
  639     /* initialize the array of indexes to datafiles in the file request
  640      * that have requests small enough to do small I/O
  641      * (pack data in unexpected message)
  642      */
  643     sio_array = malloc(sizeof(int) * attr->u.meta.dfile_count);
  644     if(!sio_array)
  645     {
  646         js_p->error_code = -PVFS_ENOMEM;
  647         goto datafile_index_array_destroy;
  648     }
  649 
  650     ret = io_find_target_datafiles(
  651         sm_p->u.io.mem_req,
  652         sm_p->u.io.file_req,
  653         sm_p->u.io.file_req_offset,
  654         attr->u.meta.dist,
  655         sm_p->getattr.object_ref.fs_id,
  656         sm_p->u.io.io_type,
  657         attr->u.meta.dfile_array,
  658         attr->u.meta.dfile_count,
  659         sm_p->u.io.datafile_index_array,
  660         &target_datafile_count,
  661         sio_array,
  662         &sio_count);
  663     if(ret < 0)
  664     {
  665         js_p->error_code = ret;
  666         goto sio_array_destroy;
  667     }
  668 
  669     sm_p->u.io.datafile_count = target_datafile_count;
  670 
  671     if (target_datafile_count == 0)
  672     {
  673         gossip_debug(GOSSIP_IO_DEBUG, "  datafile_setup_msgpairs: no "
  674                      "datafiles have data; aborting\n");
  675 
  676         js_p->error_code = IO_NO_DATA;
  677         goto sio_array_destroy;
  678     }
  679 
  680     gossip_debug(GOSSIP_IO_DEBUG,
  681                  "  %s: %d datafiles "
  682                  "might have data\n", __func__, target_datafile_count);
  683 
  684     /* look at sio_array and sio_count to see if there are any
  685      * servers that we can do small I/O to, instead of setting up
  686      * flows.  For now, we're going to stick with the semantics that
  687      * small I/O is only done if all of the sizes for the target datafiles
  688      * are small enough (sio_count == target_datafile_count).  This can
  689      * be changed in the future, for example, if sio_count is some
  690      * percentage of the target_datafile_count, then do small I/O to
  691      * the sio_array servers, etc.
  692      */
  693     if(sio_count == target_datafile_count)
  694     {
  695         gossip_debug(GOSSIP_IO_DEBUG, "  %s: doing small I/O\n", __func__);
  696 
  697         sm_p->u.io.small_io = 1;
  698         js_p->error_code = IO_DO_SMALL_IO;
  699         goto sio_array_destroy;
  700     }
  701 
  702     ret = io_contexts_init(sm_p, target_datafile_count, attr);
  703     if(ret < 0)
  704     {
  705         js_p->error_code = ret;
  706         goto sio_array_destroy;
  707     }
  708 
  709     sm_p->u.io.total_cancellations_remaining = 0;
  710 
  711     /* initialize all per server I/O operation contexts and requests */
  712     for(i = 0; i < target_datafile_count; i++)
  713     {
  714         gossip_debug(GOSSIP_IO_DEBUG, "  filling I/O request "
  715                      "for %llu\n", llu(sm_p->u.io.contexts[i].data_handle));
  716 
  717         PINT_SERVREQ_IO_FILL(
  718             sm_p->u.io.contexts[i].msg.req,
  719             *sm_p->cred_p,
  720             sm_p->object_ref.fs_id,
  721             sm_p->u.io.contexts[i].data_handle,
  722             sm_p->u.io.io_type,
  723             sm_p->u.io.flowproto_type,
  724             sm_p->u.io.datafile_index_array[i],
  725             attr->u.meta.dfile_count,
  726             attr->u.meta.dist,
  727             sm_p->u.io.file_req,
  728             sm_p->u.io.file_req_offset,
  729             PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req));
  730     }
  731 
  732     js_p->error_code = 0;
  733 
  734 sio_array_destroy:
  735     free(sio_array);
  736 
  737     goto exit;
  738 
  739 datafile_index_array_destroy:
  740     io_datafile_index_array_destroy(sm_p);
  741 error_exit:
  742 exit:
  743     return SM_ACTION_COMPLETE;
  744 }
  745 
  746 /*
  747   This is based on msgpairarray_post() in msgpairarray.c.  It's
  748   different enough in that we don't have to wait on the msgpairarray
  749   operations to all complete before posting flows as we can do so for each
  750   server individually when we're ready.  this avoids the msgpairarray
  751   sync point implicit in the design
  752 */
  753 static PINT_sm_action io_datafile_post_msgpairs(
  754         struct PINT_smcb *smcb, job_status_s *js_p)
  755 {
  756     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  757     int ret = -PVFS_EINVAL, i = 0;
  758     unsigned long status_user_tag = 0;
  759     int must_loop_encodings = 0;
  760     struct server_configuration_s *server_config = NULL;
  761 
  762     gossip_debug(GOSSIP_CLIENT_DEBUG, "io_datafile_post_msgpairs "
  763                  "state: post (%d message(s))\n", sm_p->u.io.datafile_count);
  764 
  765     if (PINT_smcb_cancelled(smcb))
  766     {
  767         js_p->error_code = -PVFS_ECANCEL;
  768         return SM_ACTION_COMPLETE;
  769     }
  770 
  771     js_p->error_code = 0;
  772 
  773     /* completion count tracks sends/recvs separately, will increment
  774      * as we go through the loop to maintain a count of outstanding msgpairs */
  775     sm_p->u.io.msgpair_completion_count = 0;
  776 
  777     for(i = 0; i < sm_p->u.io.context_count; i++)
  778     {
  779         PINT_client_io_ctx *cur_ctx = &sm_p->u.io.contexts[i];
  780         PINT_sm_msgpair_state *msg = &cur_ctx->msg;
  781 
  782         /* do not do this one again in retry case */
  783         if (cur_ctx->msg_recv_has_been_posted &&
  784             cur_ctx->msg_recv_in_progress)
  785         {
  786             ++sm_p->u.io.msgpair_completion_count;
  787             goto recv_already_posted;
  788         }
  789 
  790         if (!ENCODING_IS_VALID(sm_p->u.io.encoding))
  791         {
  792             PRINT_ENCODING_ERROR("supported", sm_p->u.io.encoding);
  793             must_loop_encodings = 1;
  794             sm_p->u.io.encoding = (ENCODING_INVALID_MIN + 1);
  795         }
  796         else if (!ENCODING_IS_SUPPORTED(sm_p->u.io.encoding))
  797         {
  798             PRINT_ENCODING_ERROR("supported", sm_p->u.io.encoding);
  799             must_loop_encodings = 1;
  800             sm_p->u.io.encoding = ENCODING_SUPPORTED_MIN;
  801         }
  802 
  803       try_next_encoding:
  804         assert(ENCODING_IS_VALID(sm_p->u.io.encoding));
  805 
  806         ret = PINT_encode(&msg->req, PINT_ENCODE_REQ, &msg->encoded_req,
  807                           msg->svr_addr, sm_p->u.io.encoding);
  808         if (ret)
  809         {
  810             if (must_loop_encodings)
  811             {
  812                 gossip_debug(GOSSIP_CLIENT_DEBUG, "Looping through "
  813                              "encodings [%d/%d]\n", sm_p->u.io.encoding,
  814                              ENCODING_INVALID_MAX);
  815 
  816                 sm_p->u.io.encoding++;
  817                 if (ENCODING_IS_VALID(sm_p->u.io.encoding))
  818                 {
  819                     goto try_next_encoding;
  820                 }
  821             }
  822             /*
  823               FIXME: make this a clean error transition by adjusting
  824               the completion count and/or (not) exiting
  825             */
  826             PVFS_perror_gossip("PINT_encode failed", ret);
  827             js_p->error_code = ret;
  828             return SM_ACTION_COMPLETE;
  829         }
  830 
  831         /* calculate maximum response message size and allocate it */
  832         msg->max_resp_sz = PINT_encode_calc_max_size(
  833             PINT_ENCODE_RESP, msg->req.op, sm_p->u.io.encoding);
  834         msg->encoded_resp_p = BMI_memalloc(
  835             msg->svr_addr, msg->max_resp_sz, BMI_RECV);
  836         if (!msg->encoded_resp_p)
  837         {
  838             /* FIXME: see above FIXME */
  839             js_p->error_code = -PVFS_ENOMEM;
  840             return SM_ACTION_COMPLETE;
  841         }
  842 
  843         /*
  844           recalculate the status user tag based on this the progress
  845           of the current context like this: status_user_tag = (4 *
  846           (context index) + context phase)
  847         */
  848         assert(cur_ctx->index == i);
  849         status_user_tag = ((4 * i) + IO_SM_PHASE_REQ_MSGPAIR_RECV);
  850 
  851         gossip_debug(GOSSIP_IO_DEBUG," posting recv with "
  852                      "status_user_tag=%lu (max_size %d)\n",
  853                      status_user_tag, msg->max_resp_sz);
  854 
  855         cur_ctx->session_tag = PINT_util_get_next_tag();
  856 
  857         cur_ctx->msg_recv_has_been_posted = 0;
  858         cur_ctx->msg_recv_in_progress = 0;
  859 
  860         server_config = PINT_get_server_config_struct(sm_p->object_ref.fs_id);
  861         ret = job_bmi_recv(
  862             msg->svr_addr, msg->encoded_resp_p, msg->max_resp_sz,
  863             cur_ctx->session_tag, BMI_PRE_ALLOC, smcb, status_user_tag,
  864             &msg->recv_status, &msg->recv_id, pint_client_sm_context,
  865             server_config->client_job_bmi_timeout);
  866         PINT_put_server_config_struct(server_config);
  867 
  868         /* ret -1: problem, do not look at msg recv_status */
  869         /* ret 1: immediate completion, see status */
  870         /* ret 0: okay */
  871 
  872         if (ret < 0) {
  873             PVFS_perror_gossip("Post of receive failed", ret);
  874             js_p->error_code = ret;
  875             continue;
  876 
  877         }
  878 
  879         if (ret == 0) {
  880             int tmp = 0;
  881             /* perform a quick test to see if the recv failed before
  882              * posting the send; if it reports an error quickly then
  883              * we can save the confusion of sending a request for
  884              * which we can't recv a response
  885              */
  886             ret = job_test(msg->recv_id, &tmp, NULL,
  887                            &msg->recv_status, 0,
  888                            pint_client_sm_context);
  889             if (ret < 0) {
  890                 PVFS_perror_gossip("Post of receive failed", ret);
  891                 js_p->error_code = ret;
  892                 continue;
  893             }
  894         }
  895 
  896         /* either from job_bmi_recv or from job_test finding something */
  897         if (ret == 1) {
  898             /*
  899              * This recv must have completed with an error because the
  900              * server has not yet been sent our request.
  901              */
  902             PVFS_perror_gossip("Receive immediately failed",
  903                                msg->recv_status.error_code);
  904 
  905             ret = msg->recv_status.error_code;
  906             js_p->error_code = ret;
  907             continue;
  908         }
  909 
  910         cur_ctx->msg_recv_has_been_posted = 1;
  911         cur_ctx->msg_recv_in_progress = 1;
  912 
  913         /* posted the receive okay */
  914         ++sm_p->u.io.msgpair_completion_count;
  915 
  916       recv_already_posted:
  917 
  918         if (cur_ctx->msg_send_has_been_posted &&
  919             cur_ctx->msg_send_in_progress)
  920         {
  921             ++sm_p->u.io.msgpair_completion_count;
  922             continue;
  923         }
  924 
  925         status_user_tag = ((4 * i) + IO_SM_PHASE_REQ_MSGPAIR_SEND);
  926 
  927         cur_ctx->msg_send_has_been_posted = 0;
  928         cur_ctx->msg_send_in_progress = 0;
  929 
  930         gossip_debug(GOSSIP_IO_DEBUG," posting send with "
  931                      "status_user_tag=%lu\n", status_user_tag);
  932 
  933         server_config = PINT_get_server_config_struct(sm_p->object_ref.fs_id);
  934         ret = job_bmi_send_list(
  935             msg->encoded_req.dest, msg->encoded_req.buffer_list,
  936             msg->encoded_req.size_list, msg->encoded_req.list_count,
  937             msg->encoded_req.total_size, cur_ctx->session_tag,
  938             msg->encoded_req.buffer_type, 1, smcb, status_user_tag,
  939             &msg->send_status, &msg->send_id, pint_client_sm_context,
  940             server_config->client_job_bmi_timeout);
  941         PINT_put_server_config_struct(server_config);
  942 
  943         if (ret < 0) {
  944             PVFS_perror_gossip("Post of send failed, cancelling recv", ret);
  945             msg->op_status = msg->send_status.error_code;
  946             msg->send_id = 0;
  947             job_bmi_cancel(msg->recv_id, pint_client_sm_context);
  948 
  949             js_p