"SfR Fresh" - the SfR Freeware/Shareware Archive 
Member "pvfs-2.7.1/src/client/sysint/sys-io.c" of archive pvfs-2.7.1.tar.gz:
As a special service "SfR Fresh" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting with prefixed line numbers.
Alternatively you can here view or download the uninterpreted source code file.
That can be also achieved for any archive member file by clicking within an archive contents listing on the first character of the file(path) respectively on the according byte size field.
1 /* WARNING: THIS FILE IS AUTOMATICALLY GENERATED FROM A .SM FILE.
2 * Changes made here will certainly be overwritten.
3 */
4
5 /*
6 * (C) 2003 Clemson University and The University of Chicago
7 *
8 * See COPYING in top-level directory.
9 */
10
11 /** \file
12 * \ingroup sysint
13 *
14 * PVFS2 system interface routines for reading and writing files.
15 */
16
17 #include <string.h>
18 #include <assert.h>
19
20 #include "client-state-machine.h"
21 #include "pvfs2-debug.h"
22 #include "job.h"
23 #include "gossip.h"
24 #include "str-utils.h"
25 #include "pint-cached-config.h"
26 #include "PINT-reqproto-encode.h"
27 #include "pint-util.h"
28 #include "pvfs2-internal.h"
29
30 #define IO_MAX_SEGMENT_NUM 50
31
32 extern job_context_id pint_client_sm_context;
33
34 enum
35 {
36 IO_NO_DATA = 132,
37 IO_DATAFILE_TRANSFERS_COMPLETE,
38 IO_RETRY,
39 IO_RETRY_NODELAY,
40 IO_GET_DATAFILE_SIZE,
41 IO_ANALYZE_SIZE_RESULTS,
42 IO_DO_SMALL_IO
43 };
44
45 /* Helper functions local to sys-io.sm. */
46
47 static inline int io_complete_context_send_or_recv(
48 PINT_smcb *smcb, job_status_s *js_p);
49
50 static inline int io_decode_ack_response(
51 PINT_client_io_ctx *cur_ctx,
52 struct PINT_decoded_msg *decoded_resp,
53 struct PVFS_server_resp **resp);
54
55 static inline int io_post_flow(
56 PINT_smcb *smcb, PINT_client_io_ctx *cur_ctx);
57
58 static inline int io_post_write_ack_recv(
59 PINT_smcb *smcb, PINT_client_io_ctx * cur_ctx);
60
61 static inline int io_process_context_recv(
62 PINT_client_sm *sm_p, job_status_s *js_p, PINT_client_io_ctx **out_ctx);
63
64 static inline int io_check_context_status(
65 PINT_client_io_ctx *cur_ctx, int io_type,
66 PVFS_size *total_size);
67
68 static int io_find_target_datafiles(
69 PVFS_Request mem_req,
70 PVFS_Request file_req,
71 PVFS_offset file_req_offset,
72 PINT_dist *dist_p,
73 PVFS_fs_id fs_id,
74 enum PVFS_io_type io_type,
75 PVFS_handle *input_handle_array,
76 int input_handle_count,
77 int *handle_index_array,
78 int *handle_index_out_count,
79 int *sio_handle_index_array,
80 int *sio_handle_index_count);
81
82 static int io_find_total_size(
83 PINT_client_sm * sm_p,
84 PVFS_offset final_offset,
85 PVFS_size * total_return_size);
86
87 static int io_find_offset(
88 PINT_client_sm * sm_p,
89 PVFS_size contig_size,
90 PVFS_size * total_return_offset);
91
92 static int io_get_max_unexp_size(
93 struct PINT_Request * file_req,
94 PVFS_handle handle,
95 PVFS_fs_id fs_id,
96 enum PVFS_io_type type,
97 int * max_unexp_payload);
98
99 static int io_zero_fill_holes(
100 PINT_client_sm *sm_p,
101 PVFS_size eof,
102 int datafile_count,
103 PVFS_size * datafile_size_array,
104 int * datafile_index_array);
105
106 static int io_contexts_init(PINT_client_sm *sm_p, int count,
107 PVFS_object_attr *attr);
108
109 static void io_contexts_destroy(PINT_client_sm *sm_p);
110
111 /* misc constants and helper macros */
112 #define IO_RECV_COMPLETED 1
113
114 /* possible I/O state machine phases (status_user_tag) */
115 #define IO_SM_PHASE_REQ_MSGPAIR_RECV 0
116 #define IO_SM_PHASE_REQ_MSGPAIR_SEND 1
117 #define IO_SM_PHASE_FLOW 2
118 #define IO_SM_PHASE_FINAL_ACK 3
119 #define IO_SM_NUM_PHASES 4
120
121 #define STATUS_USER_TAG_TYPE(tag, type) \
122 ((tag % IO_SM_NUM_PHASES) == type)
123 #define STATUS_USER_TAG_GET_INDEX(tag, type) \
124 (tag / IO_SM_NUM_PHASES)
125 #define STATUS_USER_TAG_IS_SEND_OR_RECV(tag) \
126 (STATUS_USER_TAG_TYPE(tag, IO_SM_PHASE_REQ_MSGPAIR_RECV) || \
127 STATUS_USER_TAG_TYPE(tag, IO_SM_PHASE_REQ_MSGPAIR_SEND))
128
129 static int io_datafile_index_array_init(
130 PINT_client_sm *sm_p,
131 int datafile_count);
132
133 static void io_datafile_index_array_destroy(
134 PINT_client_sm *sm_p);
135
136
137 static PINT_sm_action io_init(
138 struct PINT_smcb *smcb, job_status_s *js_p);
139
140 static struct PINT_state_s ST_init;
141 static struct PINT_pjmp_tbl_s ST_init_pjtbl[];
142 static struct PINT_tran_tbl_s ST_init_trtbl[];
143 static struct PINT_state_s ST_io_getattr;
144 static struct PINT_pjmp_tbl_s ST_io_getattr_pjtbl[];
145 static struct PINT_tran_tbl_s ST_io_getattr_trtbl[];
146
147 static PINT_sm_action io_datafile_setup_msgpairs(
148 struct PINT_smcb *smcb, job_status_s *js_p);
149
150 static struct PINT_state_s ST_io_datafile_setup_msgpairs;
151 static struct PINT_pjmp_tbl_s ST_io_datafile_setup_msgpairs_pjtbl[];
152 static struct PINT_tran_tbl_s ST_io_datafile_setup_msgpairs_trtbl[];
153 static struct PINT_state_s ST_small_io;
154 static struct PINT_pjmp_tbl_s ST_small_io_pjtbl[];
155 static struct PINT_tran_tbl_s ST_small_io_trtbl[];
156
157 static PINT_sm_action io_datafile_post_msgpairs(
158 struct PINT_smcb *smcb, job_status_s *js_p);
159
160 static struct PINT_state_s ST_io_datafile_post_msgpairs;
161 static struct PINT_pjmp_tbl_s ST_io_datafile_post_msgpairs_pjtbl[];
162 static struct PINT_tran_tbl_s ST_io_datafile_post_msgpairs_trtbl[];
163
164 static PINT_sm_action io_datafile_post_msgpairs_retry(
165 struct PINT_smcb *smcb, job_status_s *js_p);
166
167 static struct PINT_state_s ST_io_datafile_post_msgpairs_retry;
168 static struct PINT_pjmp_tbl_s ST_io_datafile_post_msgpairs_retry_pjtbl[];
169 static struct PINT_tran_tbl_s ST_io_datafile_post_msgpairs_retry_trtbl[];
170
171 static PINT_sm_action io_datafile_complete_operations(
172 struct PINT_smcb *smcb, job_status_s *js_p);
173
174 static struct PINT_state_s ST_io_datafile_complete_operations;
175 static struct PINT_pjmp_tbl_s ST_io_datafile_complete_operations_pjtbl[];
176 static struct PINT_tran_tbl_s ST_io_datafile_complete_operations_trtbl[];
177
178 static PINT_sm_action io_analyze_results(
179 struct PINT_smcb *smcb, job_status_s *js_p);
180
181 static struct PINT_state_s ST_io_analyze_results;
182 static struct PINT_pjmp_tbl_s ST_io_analyze_results_pjtbl[];
183 static struct PINT_tran_tbl_s ST_io_analyze_results_trtbl[];
184 static struct PINT_state_s ST_io_datafile_size;
185 static struct PINT_pjmp_tbl_s ST_io_datafile_size_pjtbl[];
186 static struct PINT_tran_tbl_s ST_io_datafile_size_trtbl[];
187
188 static PINT_sm_action io_analyze_size_results(
189 struct PINT_smcb *smcb, job_status_s *js_p);
190
191 static struct PINT_state_s ST_io_analyze_size_results;
192 static struct PINT_pjmp_tbl_s ST_io_analyze_size_results_pjtbl[];
193 static struct PINT_tran_tbl_s ST_io_analyze_size_results_trtbl[];
194
195 static PINT_sm_action io_cleanup(
196 struct PINT_smcb *smcb, job_status_s *js_p);
197
198 static struct PINT_state_s ST_io_cleanup;
199 static struct PINT_pjmp_tbl_s ST_io_cleanup_pjtbl[];
200 static struct PINT_tran_tbl_s ST_io_cleanup_trtbl[];
201
202 struct PINT_state_machine_s pvfs2_client_io_sm = {
203 .name = "pvfs2_client_io_sm",
204 .first_state = &ST_init
205 };
206
207 static struct PINT_state_s ST_init = {
208 .state_name = "init" ,
209 .parent_machine = &pvfs2_client_io_sm ,
210 .flag = SM_RUN ,
211 .action.func = io_init ,
212 .pjtbl = NULL ,
213 .trtbl = ST_init_trtbl
214 };
215
216 static struct PINT_tran_tbl_s ST_init_trtbl[] = {
217 { .return_value = -1 ,
218 .next_state = &ST_io_getattr }
219 };
220
221 static struct PINT_state_s ST_io_getattr = {
222 .state_name = "io_getattr" ,
223 .parent_machine = &pvfs2_client_io_sm ,
224 .flag = SM_JUMP ,
225 .action.nested = &pvfs2_client_getattr_sm ,
226 .pjtbl = NULL ,
227 .trtbl = ST_io_getattr_trtbl
228 };
229
230 static struct PINT_tran_tbl_s ST_io_getattr_trtbl[] = {
231 { .return_value = 0 ,
232 .next_state = &ST_io_datafile_setup_msgpairs },
233 { .return_value = -1 ,
234 .next_state = &ST_io_cleanup }
235 };
236
237 static struct PINT_state_s ST_io_datafile_setup_msgpairs = {
238 .state_name = "io_datafile_setup_msgpairs" ,
239 .parent_machine = &pvfs2_client_io_sm ,
240 .flag = SM_RUN ,
241 .action.func = io_datafile_setup_msgpairs ,
242 .pjtbl = NULL ,
243 .trtbl = ST_io_datafile_setup_msgpairs_trtbl
244 };
245
246 static struct PINT_tran_tbl_s ST_io_datafile_setup_msgpairs_trtbl[] = {
247 { .return_value = IO_NO_DATA ,
248 .next_state = &ST_io_cleanup },
249 { .return_value = IO_DO_SMALL_IO ,
250 .next_state = &ST_small_io },
251 { .return_value = 0 ,
252 .next_state = &ST_io_datafile_post_msgpairs },
253 { .return_value = -1 ,
254 .next_state = &ST_io_cleanup }
255 };
256
257 static struct PINT_state_s ST_small_io = {
258 .state_name = "small_io" ,
259 .parent_machine = &pvfs2_client_io_sm ,
260 .flag = SM_JUMP ,
261 .action.nested = &pvfs2_client_small_io_sm ,
262 .pjtbl = NULL ,
263 .trtbl = ST_small_io_trtbl
264 };
265
266 static struct PINT_tran_tbl_s ST_small_io_trtbl[] = {
267 { .return_value = 0 ,
268 .next_state = &ST_io_analyze_results },
269 { .return_value = -1 ,
270 .next_state = &ST_io_cleanup }
271 };
272
273 static struct PINT_state_s ST_io_datafile_post_msgpairs = {
274 .state_name = "io_datafile_post_msgpairs" ,
275 .parent_machine = &pvfs2_client_io_sm ,
276 .flag = SM_RUN ,
277 .action.func = io_datafile_post_msgpairs ,
278 .pjtbl = NULL ,
279 .trtbl = ST_io_datafile_post_msgpairs_trtbl
280 };
281
282 static struct PINT_tran_tbl_s ST_io_datafile_post_msgpairs_trtbl[] = {
283 { .return_value = IO_RETRY ,
284 .next_state = &ST_io_datafile_post_msgpairs_retry },
285 { .return_value = -1 ,
286 .next_state = &ST_io_datafile_complete_operations }
287 };
288
289 static struct PINT_state_s ST_io_datafile_post_msgpairs_retry = {
290 .state_name = "io_datafile_post_msgpairs_retry" ,
291 .parent_machine = &pvfs2_client_io_sm ,
292 .flag = SM_RUN ,
293 .action.func = io_datafile_post_msgpairs_retry ,
294 .pjtbl = NULL ,
295 .trtbl = ST_io_datafile_post_msgpairs_retry_trtbl
296 };
297
298 static struct PINT_tran_tbl_s ST_io_datafile_post_msgpairs_retry_trtbl[] = {
299 { .return_value = IO_DATAFILE_TRANSFERS_COMPLETE ,
300 .next_state = &ST_io_analyze_results },
301 { .return_value = -1 ,
302 .next_state = &ST_io_datafile_post_msgpairs }
303 };
304
305 static struct PINT_state_s ST_io_datafile_complete_operations = {
306 .state_name = "io_datafile_complete_operations" ,
307 .parent_machine = &pvfs2_client_io_sm ,
308 .flag = SM_RUN ,
309 .action.func = io_datafile_complete_operations ,
310 .pjtbl = NULL ,
311 .trtbl = ST_io_datafile_complete_operations_trtbl
312 };
313
314 static struct PINT_tran_tbl_s ST_io_datafile_complete_operations_trtbl[] = {
315 { .return_value = IO_DATAFILE_TRANSFERS_COMPLETE ,
316 .next_state = &ST_io_analyze_results },
317 { .return_value = IO_RETRY ,
318 .next_state = &ST_io_datafile_post_msgpairs_retry },
319 { .return_value = -1 ,
320 .next_state = &ST_io_datafile_complete_operations }
321 };
322
323 static struct PINT_state_s ST_io_analyze_results = {
324 .state_name = "io_analyze_results" ,
325 .parent_machine = &pvfs2_client_io_sm ,
326 .flag = SM_RUN ,
327 .action.func = io_analyze_results ,
328 .pjtbl = NULL ,
329 .trtbl = ST_io_analyze_results_trtbl
330 };
331
332 static struct PINT_tran_tbl_s ST_io_analyze_results_trtbl[] = {
333 { .return_value = IO_RETRY ,
334 .next_state = &ST_init },
335 { .return_value = IO_ANALYZE_SIZE_RESULTS ,
336 .next_state = &ST_io_analyze_size_results },
337 { .return_value = IO_GET_DATAFILE_SIZE ,
338 .next_state = &ST_io_datafile_size },
339 { .return_value = -1 ,
340 .next_state = &ST_io_cleanup }
341 };
342
343 static struct PINT_state_s ST_io_datafile_size = {
344 .state_name = "io_datafile_size" ,
345 .parent_machine = &pvfs2_client_io_sm ,
346 .flag = SM_JUMP ,
347 .action.nested = &pvfs2_client_datafile_getattr_sizes_sm ,
348 .pjtbl = NULL ,
349 .trtbl = ST_io_datafile_size_trtbl
350 };
351
352 static struct PINT_tran_tbl_s ST_io_datafile_size_trtbl[] = {
353 { .return_value = 0 ,
354 .next_state = &ST_io_analyze_size_results },
355 { .return_value = -1 ,
356 .next_state = &ST_io_cleanup }
357 };
358
359 static struct PINT_state_s ST_io_analyze_size_results = {
360 .state_name = "io_analyze_size_results" ,
361 .parent_machine = &pvfs2_client_io_sm ,
362 .flag = SM_RUN ,
363 .action.func = io_analyze_size_results ,
364 .pjtbl = NULL ,
365 .trtbl = ST_io_analyze_size_results_trtbl
366 };
367
368 static struct PINT_tran_tbl_s ST_io_analyze_size_results_trtbl[] = {
369 { .return_value = -1 ,
370 .next_state = &ST_io_cleanup }
371 };
372
373 static struct PINT_state_s ST_io_cleanup = {
374 .state_name = "io_cleanup" ,
375 .parent_machine = &pvfs2_client_io_sm ,
376 .flag = SM_RUN ,
377 .action.func = io_cleanup ,
378 .pjtbl = NULL ,
379 .trtbl = ST_io_cleanup_trtbl
380 };
381
382 static struct PINT_tran_tbl_s ST_io_cleanup_trtbl[] = {
383 { .return_value = -1 ,
384
385 .flag = SM_TERM }
386 };
387
388 # 216 "src/client/sysint/sys-io.sm"
389
390
391 /** Initiate a read or write operation.
392 *
393 * \param type specifies if the operation is a read or write.
394 */
395 PVFS_error PVFS_isys_io(
396 PVFS_object_ref ref,
397 PVFS_Request file_req,
398 PVFS_offset file_req_offset,
399 void *buffer,
400 PVFS_Request mem_req,
401 const PVFS_credentials *credentials,
402 PVFS_sysresp_io *resp_p,
403 enum PVFS_io_type io_type,
404 PVFS_sys_op_id *op_id,
405 void *user_ptr)
406 {
407 PVFS_error ret = -PVFS_EINVAL;
408 PINT_smcb *smcb = NULL;
409 PINT_client_sm *sm_p = NULL;
410 struct filesystem_configuration_s* cur_fs = NULL;
411 struct server_configuration_s *server_config = NULL;
412
413 gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_isys_io entered [%llu]\n",
414 llu(ref.handle));
415
416 if ((ref.handle == PVFS_HANDLE_NULL) ||
417 (ref.fs_id == PVFS_FS_ID_NULL) || (resp_p == NULL))
418 {
419 gossip_err("invalid (NULL) required argument\n");
420 return ret;
421 }
422
423 if ((io_type != PVFS_IO_READ) && (io_type != PVFS_IO_WRITE))
424 {
425 gossip_err("invalid (unknown) I/O type specified\n");
426 return ret;
427 }
428
429 server_config = PINT_get_server_config_struct(ref.fs_id);
430 cur_fs = PINT_config_find_fs_id(server_config, ref.fs_id);
431 PINT_put_server_config_struct(server_config);
432
433 if (!cur_fs)
434 {
435 gossip_err("invalid (unknown) fs id specified\n");
436 return ret;
437 }
438
439 /* look for zero byte operations */
440 if ((PINT_REQUEST_TOTAL_BYTES(mem_req) == 0) ||
441 (PINT_REQUEST_TOTAL_BYTES(file_req) == 0))
442 {
443 gossip_ldebug(GOSSIP_IO_DEBUG, "Warning: 0 byte I/O operation "
444 "attempted.\n");
445 resp_p->total_completed = 0;
446 return 1;
447 }
448
449 PINT_smcb_alloc(&smcb, PVFS_SYS_IO,
450 sizeof(struct PINT_client_sm),
451 client_op_state_get_machine,
452 client_state_machine_terminate,
453 pint_client_sm_context);
454 if (smcb == NULL)
455 {
456 return -PVFS_ENOMEM;
457 }
458 sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
459
460 PINT_init_msgarray_params(&sm_p->msgarray_params, ref.fs_id);
461 PINT_init_sysint_credentials(sm_p->cred_p, credentials);
462 sm_p->u.io.io_type = io_type;
463 sm_p->u.io.file_req = file_req;
464 sm_p->u.io.file_req_offset = file_req_offset;
465 sm_p->u.io.io_resp_p = resp_p;
466 sm_p->u.io.mem_req = mem_req;
467 sm_p->u.io.buffer = buffer;
468 sm_p->u.io.flowproto_type = cur_fs->flowproto;
469 sm_p->u.io.encoding = cur_fs->encoding;
470 sm_p->u.io.stored_error_code = 0;
471 sm_p->u.io.retry_count = 0;
472 sm_p->msgarray = NULL;
473 sm_p->u.io.datafile_index_array = NULL;
474 sm_p->u.io.datafile_count = 0;
475 sm_p->u.io.total_size = 0;
476 sm_p->u.io.small_io = 0;
477 sm_p->object_ref = ref;
478
479 return PINT_client_state_machine_post(
480 smcb, op_id, user_ptr);
481 }
482
483 /** Perform a read or write operation.
484 *
485 * \param type specifies if the operation is a read or write.
486 */
487 PVFS_error PVFS_sys_io(
488 PVFS_object_ref ref,
489 PVFS_Request file_req,
490 PVFS_offset file_req_offset,
491 void *buffer,
492 PVFS_Request mem_req,
493 const PVFS_credentials *credentials,
494 PVFS_sysresp_io *resp_p,
495 enum PVFS_io_type io_type)
496 {
497 PVFS_error ret = -PVFS_EINVAL, error = 0;
498 PVFS_sys_op_id op_id;
499
500 gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_sys_io entered\n");
501
502 ret = PVFS_isys_io(ref, file_req, file_req_offset, buffer, mem_req,
503 credentials, resp_p, io_type, &op_id, NULL);
504 if (ret == 1)
505 return 0;
506 else if (ret < 0)
507 {
508 PVFS_perror_gossip("PVFS_isys_io call", ret);
509 error = ret;
510 }
511 else
512 {
513 ret = PVFS_sys_wait(op_id, "io", &error);
514 if (ret)
515 {
516 PVFS_perror_gossip("PVFS_sys_wait call", ret);
517 error = ret;
518 }
519 PINT_sys_release(op_id);
520 }
521
522 return error;
523 }
524
525 /*******************************************************************/
526
527 static PINT_sm_action io_init(
528 struct PINT_smcb *smcb, job_status_s *js_p)
529 {
530 struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
531 job_id_t tmp_id;
532
533 gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) io state: io_init\n", sm_p);
534
535 assert((js_p->error_code == 0) ||
536 (js_p->error_code == IO_RETRY));
537
538 PINT_SM_GETATTR_STATE_FILL(
539 sm_p->getattr,
540 sm_p->object_ref,
541 PVFS_ATTR_META_ALL|PVFS_ATTR_COMMON_TYPE,
542 PVFS_TYPE_METAFILE,
543 0);
544
545 if (js_p->error_code == IO_RETRY ||
546 (js_p->error_code == IO_RETRY_NODELAY))
547 {
548 js_p->error_code = 0;
549
550 io_datafile_index_array_destroy(sm_p);
551 io_contexts_destroy(sm_p);
552
553 if (PINT_smcb_cancelled(smcb))
554 {
555 js_p->error_code = -PVFS_ECANCEL;
556 return SM_ACTION_COMPLETE;
557 }
558
559 if(js_p->error_code == IO_RETRY_NODELAY)
560 {
561 gossip_debug(GOSSIP_IO_DEBUG, " sys-io retrying without delay.\n");
562 js_p->error_code = 0;
563 return 1;
564 }
565 gossip_debug(GOSSIP_IO_DEBUG, " sys-io retrying with delay.\n");
566 return job_req_sched_post_timer(
567 sm_p->msgarray_params.retry_delay, smcb, 0, js_p, &tmp_id,
568 pint_client_sm_context);
569 }
570 return SM_ACTION_COMPLETE;
571 }
572
573 static PINT_sm_action io_datafile_setup_msgpairs(
574 struct PINT_smcb *smcb, job_status_s *js_p)
575 {
576 struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
577 int ret = -PVFS_EINVAL, i = 0;
578 PVFS_object_attr *attr = NULL;
579 int target_datafile_count = 0;
580 int * sio_array;
581 int sio_count;
582
583 gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) io state: "
584 "io_datafile_setup_msgpairs\n", sm_p);
585
586 if (PINT_smcb_cancelled(smcb))
587 {
588 js_p->error_code = -PVFS_ECANCEL;
589 goto exit;
590 }
591
592 js_p->error_code = 0;
593
594 attr = &sm_p->getattr.attr;
595 assert(attr);
596
597 switch(attr->objtype)
598 {
599 case PVFS_TYPE_METAFILE:
600 assert(attr->mask & PVFS_ATTR_META_DFILES);
601 assert(attr->mask & PVFS_ATTR_META_DIST);
602 assert(attr->u.meta.dist_size > 0);
603 assert(attr->u.meta.dfile_array);
604 assert(attr->u.meta.dfile_count > 0);
605 break;
606 case PVFS_TYPE_DIRECTORY:
607 js_p->error_code = -PVFS_EISDIR;
608 goto exit;
609 default:
610 js_p->error_code = -PVFS_EBADF;
611 goto exit;
612 }
613 /* cannot write to an immutable file */
614 if (sm_p->u.io.io_type == PVFS_IO_WRITE
615 && (attr->u.meta.hint.flags & PVFS_IMMUTABLE_FL))
616 {
617 js_p->error_code = -PVFS_EPERM;
618 goto exit;
619 }
620 ret = PINT_dist_lookup(attr->u.meta.dist);
621 if (ret)
622 {
623 PVFS_perror_gossip("PINT_dist_lookup failed; aborting I/O", ret);
624 js_p->error_code = -PVFS_EBADF;
625 goto exit;
626 }
627
628 ret = io_datafile_index_array_init(sm_p, attr->u.meta.dfile_count);
629 if(ret < 0)
630 {
631 js_p->error_code = ret;
632 goto error_exit;
633 }
634
635 PINT_SM_DATAFILE_SIZE_ARRAY_INIT(
636 &sm_p->u.io.dfile_size_array,
637 attr->u.meta.dfile_count);
638
639 /* initialize the array of indexes to datafiles in the file request
640 * that have requests small enough to do small I/O
641 * (pack data in unexpected message)
642 */
643 sio_array = malloc(sizeof(int) * attr->u.meta.dfile_count);
644 if(!sio_array)
645 {
646 js_p->error_code = -PVFS_ENOMEM;
647 goto datafile_index_array_destroy;
648 }
649
650 ret = io_find_target_datafiles(
651 sm_p->u.io.mem_req,
652 sm_p->u.io.file_req,
653 sm_p->u.io.file_req_offset,
654 attr->u.meta.dist,
655 sm_p->getattr.object_ref.fs_id,
656 sm_p->u.io.io_type,
657 attr->u.meta.dfile_array,
658 attr->u.meta.dfile_count,
659 sm_p->u.io.datafile_index_array,
660 &target_datafile_count,
661 sio_array,
662 &sio_count);
663 if(ret < 0)
664 {
665 js_p->error_code = ret;
666 goto sio_array_destroy;
667 }
668
669 sm_p->u.io.datafile_count = target_datafile_count;
670
671 if (target_datafile_count == 0)
672 {
673 gossip_debug(GOSSIP_IO_DEBUG, " datafile_setup_msgpairs: no "
674 "datafiles have data; aborting\n");
675
676 js_p->error_code = IO_NO_DATA;
677 goto sio_array_destroy;
678 }
679
680 gossip_debug(GOSSIP_IO_DEBUG,
681 " %s: %d datafiles "
682 "might have data\n", __func__, target_datafile_count);
683
684 /* look at sio_array and sio_count to see if there are any
685 * servers that we can do small I/O to, instead of setting up
686 * flows. For now, we're going to stick with the semantics that
687 * small I/O is only done if all of the sizes for the target datafiles
688 * are small enough (sio_count == target_datafile_count). This can
689 * be changed in the future, for example, if sio_count is some
690 * percentage of the target_datafile_count, then do small I/O to
691 * the sio_array servers, etc.
692 */
693 if(sio_count == target_datafile_count)
694 {
695 gossip_debug(GOSSIP_IO_DEBUG, " %s: doing small I/O\n", __func__);
696
697 sm_p->u.io.small_io = 1;
698 js_p->error_code = IO_DO_SMALL_IO;
699 goto sio_array_destroy;
700 }
701
702 ret = io_contexts_init(sm_p, target_datafile_count, attr);
703 if(ret < 0)
704 {
705 js_p->error_code = ret;
706 goto sio_array_destroy;
707 }
708
709 sm_p->u.io.total_cancellations_remaining = 0;
710
711 /* initialize all per server I/O operation contexts and requests */
712 for(i = 0; i < target_datafile_count; i++)
713 {
714 gossip_debug(GOSSIP_IO_DEBUG, " filling I/O request "
715 "for %llu\n", llu(sm_p->u.io.contexts[i].data_handle));
716
717 PINT_SERVREQ_IO_FILL(
718 sm_p->u.io.contexts[i].msg.req,
719 *sm_p->cred_p,
720 sm_p->object_ref.fs_id,
721 sm_p->u.io.contexts[i].data_handle,
722 sm_p->u.io.io_type,
723 sm_p->u.io.flowproto_type,
724 sm_p->u.io.datafile_index_array[i],
725 attr->u.meta.dfile_count,
726 attr->u.meta.dist,
727 sm_p->u.io.file_req,
728 sm_p->u.io.file_req_offset,
729 PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req));
730 }
731
732 js_p->error_code = 0;
733
734 sio_array_destroy:
735 free(sio_array);
736
737 goto exit;
738
739 datafile_index_array_destroy:
740 io_datafile_index_array_destroy(sm_p);
741 error_exit:
742 exit:
743 return SM_ACTION_COMPLETE;
744 }
745
746 /*
747 This is based on msgpairarray_post() in msgpairarray.c. It's
748 different enough in that we don't have to wait on the msgpairarray
749 operations to all complete before posting flows as we can do so for each
750 server individually when we're ready. this avoids the msgpairarray
751 sync point implicit in the design
752 */
753 static PINT_sm_action io_datafile_post_msgpairs(
754 struct PINT_smcb *smcb, job_status_s *js_p)
755 {
756 struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
757 int ret = -PVFS_EINVAL, i = 0;
758 unsigned long status_user_tag = 0;
759 int must_loop_encodings = 0;
760 struct server_configuration_s *server_config = NULL;
761
762 gossip_debug(GOSSIP_CLIENT_DEBUG, "io_datafile_post_msgpairs "
763 "state: post (%d message(s))\n", sm_p->u.io.datafile_count);
764
765 if (PINT_smcb_cancelled(smcb))
766 {
767 js_p->error_code = -PVFS_ECANCEL;
768 return SM_ACTION_COMPLETE;
769 }
770
771 js_p->error_code = 0;
772
773 /* completion count tracks sends/recvs separately, will increment
774 * as we go through the loop to maintain a count of outstanding msgpairs */
775 sm_p->u.io.msgpair_completion_count = 0;
776
777 for(i = 0; i < sm_p->u.io.context_count; i++)
778 {
779 PINT_client_io_ctx *cur_ctx = &sm_p->u.io.contexts[i];
780 PINT_sm_msgpair_state *msg = &cur_ctx->msg;
781
782 /* do not do this one again in retry case */
783 if (cur_ctx->msg_recv_has_been_posted &&
784 cur_ctx->msg_recv_in_progress)
785 {
786 ++sm_p->u.io.msgpair_completion_count;
787 goto recv_already_posted;
788 }
789
790 if (!ENCODING_IS_VALID(sm_p->u.io.encoding))
791 {
792 PRINT_ENCODING_ERROR("supported", sm_p->u.io.encoding);
793 must_loop_encodings = 1;
794 sm_p->u.io.encoding = (ENCODING_INVALID_MIN + 1);
795 }
796 else if (!ENCODING_IS_SUPPORTED(sm_p->u.io.encoding))
797 {
798 PRINT_ENCODING_ERROR("supported", sm_p->u.io.encoding);
799 must_loop_encodings = 1;
800 sm_p->u.io.encoding = ENCODING_SUPPORTED_MIN;
801 }
802
803 try_next_encoding:
804 assert(ENCODING_IS_VALID(sm_p->u.io.encoding));
805
806 ret = PINT_encode(&msg->req, PINT_ENCODE_REQ, &msg->encoded_req,
807 msg->svr_addr, sm_p->u.io.encoding);
808 if (ret)
809 {
810 if (must_loop_encodings)
811 {
812 gossip_debug(GOSSIP_CLIENT_DEBUG, "Looping through "
813 "encodings [%d/%d]\n", sm_p->u.io.encoding,
814 ENCODING_INVALID_MAX);
815
816 sm_p->u.io.encoding++;
817 if (ENCODING_IS_VALID(sm_p->u.io.encoding))
818 {
819 goto try_next_encoding;
820 }
821 }
822 /*
823 FIXME: make this a clean error transition by adjusting
824 the completion count and/or (not) exiting
825 */
826 PVFS_perror_gossip("PINT_encode failed", ret);
827 js_p->error_code = ret;
828 return SM_ACTION_COMPLETE;
829 }
830
831 /* calculate maximum response message size and allocate it */
832 msg->max_resp_sz = PINT_encode_calc_max_size(
833 PINT_ENCODE_RESP, msg->req.op, sm_p->u.io.encoding);
834 msg->encoded_resp_p = BMI_memalloc(
835 msg->svr_addr, msg->max_resp_sz, BMI_RECV);
836 if (!msg->encoded_resp_p)
837 {
838 /* FIXME: see above FIXME */
839 js_p->error_code = -PVFS_ENOMEM;
840 return SM_ACTION_COMPLETE;
841 }
842
843 /*
844 recalculate the status user tag based on this the progress
845 of the current context like this: status_user_tag = (4 *
846 (context index) + context phase)
847 */
848 assert(cur_ctx->index == i);
849 status_user_tag = ((4 * i) + IO_SM_PHASE_REQ_MSGPAIR_RECV);
850
851 gossip_debug(GOSSIP_IO_DEBUG," posting recv with "
852 "status_user_tag=%lu (max_size %d)\n",
853 status_user_tag, msg->max_resp_sz);
854
855 cur_ctx->session_tag = PINT_util_get_next_tag();
856
857 cur_ctx->msg_recv_has_been_posted = 0;
858 cur_ctx->msg_recv_in_progress = 0;
859
860 server_config = PINT_get_server_config_struct(sm_p->object_ref.fs_id);
861 ret = job_bmi_recv(
862 msg->svr_addr, msg->encoded_resp_p, msg->max_resp_sz,
863 cur_ctx->session_tag, BMI_PRE_ALLOC, smcb, status_user_tag,
864 &msg->recv_status, &msg->recv_id, pint_client_sm_context,
865 server_config->client_job_bmi_timeout);
866 PINT_put_server_config_struct(server_config);
867
868 /* ret -1: problem, do not look at msg recv_status */
869 /* ret 1: immediate completion, see status */
870 /* ret 0: okay */
871
872 if (ret < 0) {
873 PVFS_perror_gossip("Post of receive failed", ret);
874 js_p->error_code = ret;
875 continue;
876
877 }
878
879 if (ret == 0) {
880 int tmp = 0;
881 /* perform a quick test to see if the recv failed before
882 * posting the send; if it reports an error quickly then
883 * we can save the confusion of sending a request for
884 * which we can't recv a response
885 */
886 ret = job_test(msg->recv_id, &tmp, NULL,
887 &msg->recv_status, 0,
888 pint_client_sm_context);
889 if (ret < 0) {
890 PVFS_perror_gossip("Post of receive failed", ret);
891 js_p->error_code = ret;
892 continue;
893 }
894 }
895
896 /* either from job_bmi_recv or from job_test finding something */
897 if (ret == 1) {
898 /*
899 * This recv must have completed with an error because the
900 * server has not yet been sent our request.
901 */
902 PVFS_perror_gossip("Receive immediately failed",
903 msg->recv_status.error_code);
904
905 ret = msg->recv_status.error_code;
906 js_p->error_code = ret;
907 continue;
908 }
909
910 cur_ctx->msg_recv_has_been_posted = 1;
911 cur_ctx->msg_recv_in_progress = 1;
912
913 /* posted the receive okay */
914 ++sm_p->u.io.msgpair_completion_count;
915
916 recv_already_posted:
917
918 if (cur_ctx->msg_send_has_been_posted &&
919 cur_ctx->msg_send_in_progress)
920 {
921 ++sm_p->u.io.msgpair_completion_count;
922 continue;
923 }
924
925 status_user_tag = ((4 * i) + IO_SM_PHASE_REQ_MSGPAIR_SEND);
926
927 cur_ctx->msg_send_has_been_posted = 0;
928 cur_ctx->msg_send_in_progress = 0;
929
930 gossip_debug(GOSSIP_IO_DEBUG," posting send with "
931 "status_user_tag=%lu\n", status_user_tag);
932
933 server_config = PINT_get_server_config_struct(sm_p->object_ref.fs_id);
934 ret = job_bmi_send_list(
935 msg->encoded_req.dest, msg->encoded_req.buffer_list,
936 msg->encoded_req.size_list, msg->encoded_req.list_count,
937 msg->encoded_req.total_size, cur_ctx->session_tag,
938 msg->encoded_req.buffer_type, 1, smcb, status_user_tag,
939 &msg->send_status, &msg->send_id, pint_client_sm_context,
940 server_config->client_job_bmi_timeout);
941 PINT_put_server_config_struct(server_config);
942
943 if (ret < 0) {
944 PVFS_perror_gossip("Post of send failed, cancelling recv", ret);
945 msg->op_status = msg->send_status.error_code;
946 msg->send_id = 0;
947 job_bmi_cancel(msg->recv_id, pint_client_sm_context);
948
949 js_p