PVFS2.8.2源代码阅读笔记(still working)

1、俯瞰

先了解PVFS2的总体设计。

1.1 主要术语介绍

  • distributions 数据分布,即逻辑文件与物理文件之间的映射。
  • job 一个PVFS操作操作包含多个步骤,每个步骤称为一个job。
  • BMI (Buffered Message Interface) 客户端与服务端通讯模块。
  • flows 数据流,描述数据从客户端到服务端的过程,该过程不涉及元数据操作。
    • flow interface 用来设置flow的接口。
    • flow protocol 两个终端通讯协议。
    • flow endpoint 源或者目的地。
    • flow descriptor flow的数据结构。
  • trove 数据存储模块。
  • vtags 数据流的版本信息,用于控制一致性。
  • instance tag 用于保证数据一致性。
  • gossip log用。

1.2 BMI 设计

BMI模块用于客户端与服务端通信,提供消息排序,flow控制功能。所有BMI通信都是非阻塞的。

1.3 状态机

client和sever都用状态机来控制执行状态。

1.4 PVFS request

typedef struct PINT_Request PVFS_Request;可以表示MPI_Datatype能表示的任何数据类型。

typedef struct PINT_Request_state PVFS_Request_state;表示一个request的处理进度。

typedef struct PINT_reqstack PVFS_reqstack,requeststate一起表示状态。

PINT_Request_file_data

PINT_Process_request

PVFS_Request的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
typedef struct PINT_Request {
//与上一个request的距离
PVFS_offset offset; /* offset from start of last set of elements */
//一个block是指一段相同连续数据的集合,一个block里有多个ereq
int32_t num_ereqs; /* number of ereqs in a block */
int32_t num_blocks; /* number of blocks */
//block之间的距离
PVFS_size stride; /* stride between blocks in bytes */
PVFS_offset ub; /* upper bound of the type in bytes */
PVFS_offset lb; /* lower bound of the type in bytes */
PVFS_size aggregate_size; /* amount of aggregate data in bytes */
//chunk
int32_t num_contig_chunks; /* number of contiguous data chunks */
int32_t depth; /* number of levels of nesting */
int32_t num_nested_req;/* number of requests nested under this one */
int32_t committed; /* indicates if request has been commited */
int32_t refcount; /* number of references to this request struct */
struct PINT_Request *ereq; /* element type */
//用于指定连续不同类型的block组
struct PINT_Request *sreq; /* sequence type */
} PINT_Request;

根据下面两张图来理解PINT_Request

request实例1

上图中的A,可以表示为:

1
2
3
4
5
6
7
8
9
10
11
12
PTYPE:
offset = OFFSET
num_ereqs = SIZE
stride = 1
num_blocks = 1
ub = SIZE
lb = 0
aggregate_size = SIZE
depth = 1
num_contig_chunks = 1
etype = PVFS_Request_byte
stype = NULL

也就是说,A中的request只有一个block,一个block是指一段相同连续数据的集合,A中的block就是个SIZE个PVFS_Requst_byte的集合,PVFS_Requst_byte大小为1。

上图的B,可以表示为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
PTYPE:
offset = OFFSET
num_ereqs = COUNT
stride = 1
num_blocks = 1
ub = COUNT * 4
lb = 0
aggregate_size = COUNT * 4
depth = 1
num_contig_chunks = 1
etype = PVFS_Request_int
stype = NULL

PVFS_Request_int:
offset = 0
num_ereqs = 4
stride = 1
num_blocks = 1
ub = 4
lb = 0
aggregate_size = 4
depth = 0
num_contig_chunks = 1
etype = NULL
stype = NULL

B中的etype是PVFS_Request_int,大小为4,pvfs中的预定义类型的etype都是null。B也只有一个block,block里有COUNT个PVFS_Request_int。depth = etype.depth+1。

上图的C,可以表示为:

1
2
3
4
5
6
7
8
9
10
11
12
PTYPE:
offset = OFFSET
num_ereqs = ELEMENTS
stride = STRIDE
num_blocks = COUNT
ub = ((COUNT - 1) * STRIDE) + (ELEMENTS * ESIZE)
lb = 0
aggregate_size = COUNT * ELEMENTS * ESIZE
depth = 1
num_contig_chunks = COUNT
etype = ETYPE
stype = NULL

C有COUNT个block,block之间不是连续的,block之间的间距为STRIDE,STRIDE是两个block其实偏移地址的距离。一个block里面有ELEMENTS个ETYPE,每个ETYPE大小为ESIZE。ub表示数据上界的相对偏移地址,lb为下界,第一块数据lb=0。

request实例2

上图中的D表示为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
FIRST-PTYPE:
offset = OFFSET
num_ereqs = 4
stride = 1
num_blocks = 1
ub = 764
lb = 0
aggregate_size = 380
depth = 1
num_contig_chunks = 17
etype = PVFS_Request_float
stype = NEXT-PTYPE

NEXT-PTYPE:
offset = OFFSET + 40
num_ereqs = 6
stride = 48
num_blocks = 15
ub = 764
lb = 40
aggregate_size = 364
depth = 1
num_contig_chunks = 16
etype = PVFS_Request_float
stype = LAST-PTYPE

LAST-PTYPE:
offset = OFFSET + 760
num_ereqs = 1
stride = 1
num_blocks = 1
ub = 764
lb = 760
aggregate_size = 4
depth = 1
num_contig_chunks = 1
etype = PVFS_Request_float
stype = NULL

将request按照block类型分组,总共3组,每一组用stype表示。值得注意的是参数ub,lb,aggregate_size。aggregate_size = ub - lb 。 ub始终为request的最上界,lb为当前组的下界。

上图中的E表示为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
OUTER-PTYPE:
offset = OFFSET
num_ereqs = 2
stride = 768
num_blocks = 4
ub = 3264
lb = 0
aggregate_size = 384
depth = 2
num_contig_chunks = 16
etype = INNER-PTYPE
stype = NULL

INNER-PTYPE:
offset = 0
num_ereqs = 6
stride = 48
num_blocks = 2
ub = 96
lb = 0
aggregate_size = 48
depth = 1
num_contig_chunks = 2
etype = PVFS_Request_int
stype = NULL

将request的划分方式又有不同,这次是从外到内,外面的是一组相同的block,里面也是一组相同的block,depth为2。

其他参数暂时不考虑。

2、MPI—IO ADIO

PVFS2有三种接口,其中一种是MPI-IO,被集成在ROMIO的ADIO模块中。

两种函数common和filesystem。common是所有操作的集合,通常strided 操作使用的是common里的方法,每个filesystem子文件夹里都定义自己的io函数,在ADIO_FNS_Operations结构里。
ADIO_FileSysType_prefix 检测文件前缀
ADIO_ResolveFileType 决定op指向哪个ADIO_FNS_Operations结构,如前缀为“pvfs2:”的指向ADIO_PVFS2_operationsADIO_PVFS2_operations中的函数指针列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
struct ADIOI_Fns_struct ADIO_PVFS2_operations = {
ADIOI_PVFS2_Open, /* Open */
ADIOI_SCALEABLE_OpenColl, /* OpenColl */
ADIOI_PVFS2_ReadContig, /* ReadContig */
ADIOI_PVFS2_WriteContig, /* WriteContig */
ADIOI_GEN_ReadStridedColl, /* ReadStridedColl */
ADIOI_GEN_WriteStridedColl, /* WriteStridedColl */
ADIOI_GEN_SeekIndividual, /* SeekIndividual */
ADIOI_PVFS2_Fcntl, /* Fcntl */
ADIOI_PVFS2_SetInfo, /* SetInfo */
ADIOI_PVFS2_ReadStrided, /* ReadStrided */
ADIOI_PVFS2_WriteStrided, /* WriteStrided */
ADIOI_PVFS2_Close, /* Close */
ADIOI_PVFS2_IReadContig, /* IreadContig */
ADIOI_PVFS2_IWriteContig, /* IwriteContig */
ADIOI_FAKE_IODone, /* ReadDone */
ADIOI_FAKE_IODone, /* WriteDone */
ADIOI_FAKE_IOComplete, /* ReadComplete */
ADIOI_FAKE_IOComplete, /* WriteComplete */
ADIOI_FAKE_IreadStrided, /* IreadStrided */
ADIOI_FAKE_IwriteStrided, /* IwriteStrided */
ADIOI_PVFS2_Flush, /* Flush */
ADIOI_PVFS2_Resize, /* Resize */
ADIOI_PVFS2_Delete, /* Delete */
ADIOI_PVFS2_Feature,
};

adio-pvfs2 写连续型数据,首先调用 PVFS_Request_contiguous,初始化两个pvfs-request(file,memory),PVFS_Request_contiguous调用关系如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/*
len:数据总大小
PVFS_BYTE:一种PVFS的请求(extern PVFS_Request PVFS_BYTE;)
static struct PINT_Request PINT_BYTE = { 0, 1, 0, 1, 1, 0, 1, 1, 1, 0, 0, -1, NULL, NULL };
mem_req:
*/

ret = PVFS_Request_contiguous(len, PVFS_BYTE, &mem_req);
ret = PVFS_Request_contiguous(len, PVFS_BYTE, &file_req);
if (file_ptr_type == ADIO_EXPLICIT_OFFSET){
//PVFS_sys_write is in include/pvfs2-compat.h
ret = PVFS_sys_write(pvfs_fs->object_ref, file_req, offset, buf,
mem_req, &(pvfs_fs->credentials), &resp_io);
fd->fp_sys_posn = offset + (int) resp_io.total_completed;
}
else {
ret = PVFS_sys_write(pvfs_fs->object_ref, file_req, fd->fp_ind, buf,
mem_req, &(pvfs_fs->credentials), &resp_io);

fd->fp_ind += (int)resp_io.total_completed;
fd->fp_sys_posn = fd->fp_ind;
}
#ifdef HAVE_STATUS_SET_BYTES
MPIR_Status_set_bytes(status, datatype, (int)resp_io.total_completed);
#endif
*error_code = MPI_SUCCESS;
PVFS_Request_free(&file_req);
PVFS_Request_free(&mem_req);
return;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
//PVFS_Request_contiguous 函数原型。

int PVFS_Request_contiguous(int32_t count,
PVFS_Request oldreq,
PVFS_Request * newreq)
{
return PVFS_Request_hvector(1, count, 0, oldreq, newreq);
}

//PVFS_Request_hvector 函数原型。
/*
实际调用时的参数传递:PVFS_Request_hvector(1, len, 0, PVFS_BYTE, mem_req);
*/

int PVFS_Request_hvector(int32_t count,
int32_t blocklength,
PVFS_size stride,
PVFS_Request oldreq,
PVFS_Request * newreq)
{
PVFS_size oldext;
if (oldreq == NULL)
return PVFS_ERR_REQ;
PVFS_Request_extent(oldreq, &oldext);
// PINT_REQUEST_REFINC(oldreq);
*newreq = (PINT_Request *) malloc(sizeof(struct PINT_Request));
(*newreq)->sreq = NULL;
PINT_subreq(0, blocklength, stride, count, oldreq, oldext, newreq);
/* calculate statistics like ub, lb, depth, etc. */
if (stride < 0)
{
(*newreq)->lb = (count - 1) * stride;
}
PINT_REQUEST_REFSET(*newreq);
return PVFS_SUCCESS;
}

/*
实际调用时的参数传递:PVFS_Request_extent(PVFS_BYTE, oldext);
该函数是求oldreq的长度,这里是PVFS_BYTE
*/
int PVFS_Request_extent(PVFS_Request request,
PVFS_size * extent)
{
if (request == NULL)
return PVFS_ERR_REQ;
*extent = request->ub - request->lb;
return PVFS_SUCCESS;
}

/*
用oldreq作为etype组合成新的request。代码比较简单。
*/

//PINT_subreq(0, blocklength, stride, count, oldreq, oldext, newreq);
//PINT_subreq(0, len, 0, 1, PVFS_BYTE, oldext, mem_req);

static int PINT_subreq(PVFS_offset offset,
int32_t bsize,
PVFS_size stride,
int32_t count,
PVFS_Request oldreq,
PVFS_size oldext,
PVFS_Request * newreq)
{
if (oldreq == NULL)
return PVFS_ERR_REQ;
//初始新的PVFS-request
(*newreq)->offset = offset;
(*newreq)->num_ereqs = bsize;
(*newreq)->stride = stride;
(*newreq)->num_blocks = count;
(*newreq)->ub = offset + ((count - 1) * stride) + (bsize * oldext);
(*newreq)->lb = offset;
(*newreq)->aggregate_size = oldreq->aggregate_size * count * bsize;
/* compute num_contig_chunks */
if (oldreq->aggregate_size == oldext && oldreq->num_contig_chunks == 1)
/* oldreq is contiguous */
if (count == 1 || stride == bsize * oldext || -stride == bsize * oldext)
/* newreq will be contiguous */
(*newreq)->num_contig_chunks = 1;
else
/* each block will be contiguous */
(*newreq)->num_contig_chunks = count;
else
/* nothing is contiguous */
(*newreq)->num_contig_chunks =
oldreq->num_contig_chunks * bsize * count;
(*newreq)->depth = oldreq->depth + 1;
(*newreq)->num_nested_req = oldreq->num_nested_req + 1;
(*newreq)->committed = 0;
(*newreq)->refcount = 0;
(*newreq)->ereq = oldreq;
PINT_REQUEST_REFINC(oldreq);
return PVFS_SUCCESS;
}

然后调用 PVFS_sys_write 进行写操作,PVFS_sys_write 调用关系如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
#define PVFS_sys_write(ref,req,off,buf,mem_req,creds,resp) \
PVFS_sys_io(ref,req,off,buf,mem_req,creds,resp,PVFS_IO_WRITE,PVFS_HINT_NULL)

struct ADIOI_PVFS2_fs_s {
PVFS_object_ref object_ref;
PVFS_credentials credentials;
} ADIOI_PVFS2_fs_s;

typedef struct ADIOI_PVFS2_fs_s ADIOI_PVFS2_fs;

typedef struct
{
PVFS_handle handle;
PVFS_fs_id fs_id;
int32_t __pad1;
} PVFS_object_ref;

struct PVFS_sysresp_io_s
{
PVFS_size total_completed;
};
typedef struct PVFS_sysresp_io_s PVFS_sysresp_io;

//PVFS_sys_write(pvfs_fs->object_ref, file_req, offset, buf,
mem_req, &(pvfs_fs->credentials), &resp_io);

/** Perform a read or write operation.
*
* \param type specifies if the operation is a read or write.
*/
PVFS_error PVFS_sys_io(
PVFS_object_ref ref,
PVFS_Request file_req,
PVFS_offset file_req_offset,
void *buffer,
PVFS_Request mem_req,
const PVFS_credentials *credentials,
PVFS_sysresp_io *resp_p,
enum PVFS_io_type io_type,
PVFS_hint hints)
{
PVFS_error ret = -PVFS_EINVAL, error = 0;
PVFS_sys_op_id op_id;

gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_sys_io entered\n");

//初始化读写操作,应该是在给op_id赋值,传递给server进行操作
ret = PVFS_isys_io(ref, file_req, file_req_offset, buffer, mem_req,
credentials, resp_p, io_type, &op_id, hints, NULL);
if (ret == 1)
return 0;
else if (ret < 0)
{
PVFS_perror_gossip("PVFS_isys_io call", ret);
error = ret;
}
else
{
//发送io请求,等待io完成
ret = PVFS_sys_wait(op_id, "io", &error);
if (ret)
{
PVFS_perror_gossip("PVFS_sys_wait call", ret);
error = ret;
}
PINT_sys_release(op_id);
}

return error;
}

/** Initiate a read or write operation.
*
* \param type specifies if the operation is a read or write.
*/
PVFS_error PVFS_isys_io(
//ref是一个pvfs文件对象(文件,目录,链接)
PVFS_object_ref ref,
PVFS_Request file_req,
PVFS_offset file_req_offset,
void *buffer,
PVFS_Request mem_req,
const PVFS_credentials *credentials,
PVFS_sysresp_io *resp_p,
enum PVFS_io_type io_type,
PVFS_sys_op_id *op_id,
PVFS_hint hints,
void *user_ptr)
{
PVFS_error ret = -PVFS_EINVAL;
PINT_smcb *smcb = NULL;
PINT_client_sm *sm_p = NULL;
struct filesystem_configuration_s* cur_fs = NULL;
struct server_configuration_s *server_config = NULL;

gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_isys_io entered [%llu]\n",
llu(ref.handle));

if ((ref.handle == PVFS_HANDLE_NULL) ||
(ref.fs_id == PVFS_FS_ID_NULL) || (resp_p == NULL))
{
gossip_err("invalid (NULL) required argument\n");
return ret;
}

if ((io_type != PVFS_IO_READ) && (io_type != PVFS_IO_WRITE))
{
gossip_err("invalid (unknown) I/O type specified\n");
return ret;
}

//找到文件系统配置文件 先找sever配置fs.conf,
server_config = PINT_get_server_config_struct(ref.fs_id);
cur_fs = PINT_config_find_fs_id(server_config, ref.fs_id);
PINT_put_server_config_struct(server_config);
//找完了


if (!cur_fs)
{
gossip_err("invalid (unknown) fs id specified\n");
return ret;
}

/* look for zero byte operations */
if ((PINT_REQUEST_TOTAL_BYTES(mem_req) == 0) ||
(PINT_REQUEST_TOTAL_BYTES(file_req) == 0))
{
gossip_ldebug(GOSSIP_IO_DEBUG, "Warning: 0 byte I/O operation "
"attempted.\n");
resp_p->total_completed = 0;
return 1;
}
//应该只是分配一段空间给smcb,在smcb中定义了操作类型PVFS_SYS_IO
//一个smcb对应一个sm执行实例
PINT_smcb_alloc(&smcb, PVFS_SYS_IO,
sizeof(struct PINT_client_sm),
client_op_state_get_machine,
client_state_machine_terminate,
pint_client_sm_context);
//检查是否分配成功
if (smcb == NULL)
{
return -PVFS_ENOMEM;
}
//PINT_client_sm
sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);

PINT_init_msgarray_params(sm_p, ref.fs_id);
PINT_init_sysint_credentials(sm_p->cred_p, credentials);

sm_p->u.io.io_type = io_type;
sm_p->u.io.file_req = file_req;
sm_p->u.io.file_req_offset = file_req_offset;
sm_p->u.io.io_resp_p = resp_p;
sm_p->u.io.mem_req = mem_req;
sm_p->u.io.buffer = buffer;
sm_p->u.io.flowproto_type = cur_fs->flowproto;
sm_p->u.io.encoding = cur_fs->encoding;
sm_p->u.io.stored_error_code = 0;
sm_p->u.io.retry_count = 0;
sm_p->msgarray_op.msgarray = NULL;
sm_p->msgarray_op.count = 0;
sm_p->u.io.datafile_index_array = NULL;
sm_p->u.io.datafile_count = 0;
sm_p->u.io.total_size = 0;
sm_p->u.io.small_io = 0;
sm_p->object_ref = ref;


PVFS_hint_copy(hints, &sm_p->hints);
PVFS_hint_add(&sm_p->hints, PVFS_HINT_HANDLE_NAME, sizeof(PVFS_handle), &ref.handle);

//post发送请求,实际上就是开启状态机,等待请求完成
return PINT_client_state_machine_post(
smcb, op_id, user_ptr);

}

/** Adds a state machine into the list of machines that are being
* actively serviced.
*/
PVFS_error PINT_client_state_machine_post(
PINT_smcb *smcb,
PVFS_sys_op_id *op_id,
void *user_ptr /* in */)
{
PINT_sm_action sm_ret;
PVFS_error ret = -PVFS_EINVAL;
job_status_s js;
int pvfs_sys_op = PINT_smcb_op(smcb);
PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);

PVFS_hint_add_internal(&sm_p->hints, PINT_HINT_OP_ID, sizeof(pvfs_sys_op), &pvfs_sys_op);

PINT_EVENT_START(PINT_client_sys_event_id, pint_client_pid, NULL, &sm_p->event_id,
PINT_HINT_GET_CLIENT_ID(sm_p->hints),
PINT_HINT_GET_RANK(sm_p->hints),
PINT_HINT_GET_REQUEST_ID(sm_p->hints),
PINT_HINT_GET_HANDLE(sm_p->hints),
pvfs_sys_op);

gossip_debug(GOSSIP_CLIENT_DEBUG,
"PINT_client_state_machine_post smcb %p, op: %s\n",
smcb, PINT_client_get_name_str(smcb->op));

CLIENT_SM_ASSERT_INITIALIZED();

if (!smcb)
{
return ret;
}

memset(&js, 0, sizeof(js));

/* save operation type; mark operation as unfinished */
sm_p->user_ptr = user_ptr;

gen_mutex_lock(&test_mutex);
/*
start state machine and continue advancing while we're getting
immediate completions
*/
//开启状态机,表示操作正式开始
sm_ret = PINT_state_machine_start(smcb, &js);
assert(SM_ACTION_ISVALID(sm_ret));

if(sm_ret < 0)
{
/* state machine code failed */
gen_mutex_unlock(&test_mutex);
return sm_ret;
}

//等待状态机是否完成
if (PINT_smcb_complete(smcb))
{
assert(sm_ret == SM_ACTION_TERMINATE);

PINT_EVENT_END(PINT_client_sys_event_id, pint_client_pid, NULL, sm_p->event_id, 0);

*op_id = -1;

/* free the smcb and any other extra data allocated there */
PINT_sys_release_smcb(smcb);

gossip_debug(
GOSSIP_CLIENT_DEBUG, "Posted %s (%llu) "
"(ran to termination)(%d)\n",
PINT_client_get_name_str(pvfs_sys_op),
llu((op_id ? *op_id : -1)),
js.error_code);

}
else
{
assert(sm_ret == SM_ACTION_DEFERRED);

PINT_id_gen_safe_register(&sm_p->sys_op_id, (void *)smcb);
if (op_id)
{
*op_id = sm_p->sys_op_id;
}

gossip_debug(
GOSSIP_CLIENT_DEBUG, "Posted %s (%lld) "
"(waiting for test)(%d)\n",
PINT_client_get_name_str(pvfs_sys_op),
lld((op_id ? *op_id : -1)),
ret);
}
gen_mutex_unlock(&test_mutex);
return js.error_code;
}

io状态机示意图:

io-statemachine

client-sm定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

typedef struct PINT_client_sm
{
/* this code removed and corresponding fields added to the generic
* state machine code in the PINT_smcb struct
*/
/* used internally by client-state-machine.c */
PVFS_sys_op_id sys_op_id;
void *user_ptr;

PINT_event_id event_id;

/* stores the final operation error code on operation exit */
PVFS_error error_code;

int comp_ct; /* used to keep up with completion of multiple
* jobs for some states; typically set and
* then decremented to zero as jobs complete */

/* generic getattr used with getattr sub state machines */
PINT_sm_getattr_state getattr;
/* generic dirent array used by both readdir and readdirplus state machines */
PINT_sm_readdir_state readdir;

/* fetch_config state used by the nested fetch config state machines */
struct PINT_server_fetch_config_sm_state fetch_config;

PVFS_hint hints;

PINT_sm_msgarray_op msgarray_op;

PVFS_object_ref object_ref;
PVFS_object_ref parent_ref;

PVFS_credentials *cred_p;
union
{
struct PINT_client_remove_sm remove;
struct PINT_client_create_sm create;
struct PINT_client_mkdir_sm mkdir;
struct PINT_client_symlink_sm sym;
struct PINT_client_getattr_sm getattr;
struct PINT_client_setattr_sm setattr;
struct PINT_client_io_sm io;
struct PINT_client_flush_sm flush;
struct PINT_client_readdir_sm readdir;
struct PINT_client_readdirplus_sm readdirplus;
struct PINT_client_lookup_sm lookup;
struct PINT_client_rename_sm rename;
struct PINT_client_mgmt_setparam_list_sm setparam_list;
struct PINT_client_truncate_sm truncate;
struct PINT_client_mgmt_statfs_list_sm statfs_list;
struct PINT_client_mgmt_perf_mon_list_sm perf_mon_list;
struct PINT_client_mgmt_event_mon_list_sm event_mon_list;
struct PINT_client_mgmt_iterate_handles_list_sm iterate_handles_list;
struct PINT_client_mgmt_get_dfile_array_sm get_dfile_array;
struct PINT_client_mgmt_remove_dirent_sm mgmt_remove_dirent;
struct PINT_client_mgmt_create_dirent_sm mgmt_create_dirent;
struct PINT_client_mgmt_get_dirdata_handle_sm mgmt_get_dirdata_handle;
struct PINT_server_get_config_sm get_config;
struct PINT_client_geteattr_sm geteattr;
struct PINT_client_seteattr_sm seteattr;
struct PINT_client_deleattr_sm deleattr;
struct PINT_client_listeattr_sm listeattr;
struct PINT_client_perf_count_timer_sm perf_count_timer;
struct PINT_sysdev_unexp_sm sysdev_unexp;
struct PINT_client_job_timer_sm job_timer;
} u;
} PINT_client_sm;

io-statemachine的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

struct PINT_client_io_sm
{
/* input parameters */
enum PVFS_io_type io_type;
PVFS_Request file_req;
PVFS_offset file_req_offset;
void *buffer;
PVFS_Request mem_req;

/* output parameter */
PVFS_sysresp_io *io_resp_p;

enum PVFS_flowproto_type flowproto_type;
enum PVFS_encoding_type encoding;

int *datafile_index_array;
int datafile_count;

int msgpair_completion_count;
int flow_completion_count;
int write_ack_completion_count;

PINT_client_io_ctx *contexts;
int context_count;

int total_cancellations_remaining;

int retry_count;
int stored_error_code;

PVFS_size total_size;

PVFS_size * dfile_size_array;
int small_io;
};

state-machine control block的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/* State machine control block - one per running instance of a state
* machine
*/
typedef struct PINT_smcb
{
/* state machine execution variables */
int stackptr;
struct PINT_state_s *current_state;
struct PINT_state_stack_s state_stack[PINT_STATE_STACK_SIZE];

struct qlist_head frames; /* circular list of frames */
int base_frame; /* index of current base frame */
int frame_count; /* number of frames in list */

/* usage specific routinet to look up SM from OP */
struct PINT_state_machine_s *(*op_get_state_machine)(int);
/* state machine context and control variables */
int op; /* this field externally indicates type of state machine */
PVFS_id_gen_t op_id; /* unique ID for this operation */
struct PINT_smcb *parent_smcb; /* points to parent smcb or NULL */
int op_terminate; /* indicates SM is ready to terminate */
int op_cancelled; /* indicates SM operation was cancelled */
int children_running; /* the number of child SMs running */
int op_completed; /* indicates SM operation was added to completion Q */
/* add a lock here */
job_context_id context; /* job context when waiting for children */
int (*terminate_fn)(struct PINT_smcb *, job_status_s *);
void *user_ptr; /* external user pointer */
int immediate; /* specifies immediate completion of the state machine */
} PINT_smcb;

PVFS2中每一种系统操作(VFS文件操作)都对应一种状态机,statemachine。一个运行的状态机实例对应一个状态机控制器,smcb。状态机运行时有两个栈,一个是状态机栈,一个是帧(frame)栈,帧栈保留的是与状态机相关的参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104

/* Function: PINT_smcb_alloc
Params: pointer to an smcb pointer, an op code (int), size of frame
(int), pinter to function to locate SM
Returns: nothing, but fills in pointer argument
Synopsis: this allocates an smcb struct, including its frame stack
and sets the op code so you can start the state machine
*/
int PINT_smcb_alloc(
struct PINT_smcb **smcb,
int op,
int frame_size,
struct PINT_state_machine_s *(*getmach)(int),
int (*term_fn)(struct PINT_smcb *, job_status_s *),
job_context_id context_id)
{
*smcb = (struct PINT_smcb *)malloc(sizeof(struct PINT_smcb));
if (!(*smcb))
{
return -PVFS_ENOMEM;
}
/* zero out all members */
memset(*smcb, 0, sizeof(struct PINT_smcb));

INIT_QLIST_HEAD(&(*smcb)->frames);
(*smcb)->base_frame = -1; /* no frames yet */
(*smcb)->frame_count = 0;

/* if frame_size given, allocate a frame */
if (frame_size > 0)
{
void *new_frame = malloc(frame_size);
if (!new_frame)
{
free(*smcb);
*smcb = NULL;
return -PVFS_ENOMEM;
}
/* zero out all members */
memset(new_frame, 0, frame_size);
PINT_sm_push_frame(*smcb, 0, new_frame);
(*smcb)->base_frame = 0;
}
(*smcb)->op = op;
(*smcb)->op_get_state_machine = getmach;
(*smcb)->terminate_fn = term_fn;
(*smcb)->context = context_id;
/* if a getmach given, lookup state machine */
if (getmach)
return PINT_state_machine_locate(*smcb);
return 0; /* success */
}


/* Function: PINT_state_machine_locate(void)
Params: smcb pointer with op correctly set
Returns: 1 on successful locate, 0 on locate failure, <0 on error
Synopsis: This function locates the state associated with the op
specified in smcb->op in order to start a state machine's
execution.
*/
int PINT_state_machine_locate(struct PINT_smcb *smcb)
{
struct PINT_state_s *current_tmp;
struct PINT_state_machine_s *op_sm;
const char *state_name;
const char *machine_name;

/* check for valid inputs */
if (!smcb || smcb->op < 0 || !smcb->op_get_state_machine)
{
gossip_err("State machine requested not valid\n");
return -PVFS_EINVAL;
}
gossip_debug(GOSSIP_STATE_MACHINE_DEBUG,
"[SM Locating]: (%p) op-id: %d\n",smcb,(smcb)->op);
/* this is a the usage dependant routine to look up the SM */
op_sm = (*smcb->op_get_state_machine)(smcb->op);
if (op_sm != NULL)
{
current_tmp = op_sm->first_state;
/* handle the case in which the first state points to a nested
* machine, rather than a simple function
*/
while(current_tmp->flag == SM_JUMP)
{
PINT_push_state(smcb, current_tmp);
current_tmp = ((struct PINT_state_machine_s *)
current_tmp->action.nested)->first_state;
}
smcb->current_state = current_tmp;

state_name = PINT_state_machine_current_state_name(smcb);
machine_name = PINT_state_machine_current_machine_name(smcb);

gossip_debug(GOSSIP_STATE_MACHINE_DEBUG,
"[SM Locating]: (%p) located: %s:%s\n",
smcb, machine_name, state_name);

return 1; /* indicates successful locate */
}
gossip_err("State machine not found for operation %d\n",smcb->op);
return 0; /* indicates failed to locate */
}

PINT_smcb_alloc()根据分配一个smcb,初始化这个smcb,并初始化帧栈。传入函数指针 client_op_state_get_machine,将将其赋值给smcb->op_get_state_machine。调用PINT_state_machine_locate(*smcb),这个函数会调用smcb->op_get_state_machine,也就是 client_op_state_get_machine,根据op找到对应的sm(一个op操作对应一个sm,sm的定义就是.sm后缀文件,也可以看.c文件的声明部分。),并且初始化smcb->current_state。如果sm的初始状态的flag是jump的话,要一直进入到最终的状态机(初始状态不是JUMP动作)中,将最终状态机的first_state作为smcb->current_state。

返回PINT_client_state_machine_post(),该函数实际会调用 PINT_state_machine_start(),即启动IO状态机。 PINT_state_machine_start会调用PINT_state_machine_invoke(),该函数执行current_state的action.func,这样就跳到了状态机的逻辑中。

每一个状态执行开始,第一个动作就是在帧栈里取smcb对应的帧,执行过程中修改帧,如果涉及跳入嵌套状态机,则状态机退出时需要将帧压栈。

根据io状态机的描述,比较重要的几个state是io_datafile_setup_msgpairsio_datafile_post_msgpairsio_datafile_complete_operations

  • io_datafile_setup_msgpairs对应的执行函数是io_datafile_setup_msgpairs,其中会调用io_find_target_datafiles,将请求拆分为子请求。

  • io_datafile_post_msgpairs对应的执行函数是io_datafile_post_msgpairs

  • io_datafile_complete_operations对应的执行函数是io_datafile_complete_operations,其中会调用io_post_flow

io_post_flow 是一个非常重要的函数,其中会初始化cur_ctx,保存flow的上下文,并调用job_flowjob_flow调用PINT_flow_post,通过调用active_flowproto_table[flowproto_id]->flowproto_post(flow_d)正式开始一个io请求。active_flowproto_table[flowproto_id]->flowproto_post(flow_d)函数调用的是flow\Flowproto-bmi-trove\Flowproto-multiqueue.c中的fp_multiqueue_postfp_multiqueue_post调用mem_to_bmi_callback_fn,该函数调用BMI_post_send_list,BMI_post_send_list调用BMI_tcp_post_send_listBMI_tcp_post_send_list调用tcp_post_send_generic

mem_to_bmi_callback_fn中还有个函数PINT_process_request值得注意PINT_process_request该函数调用PINT_Distribute来更新子请求?。

mem_to_bmi_callback_fn中用fp_queue_item数据结构来记录flow,q_item->result_chain.result记录实际的请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
//q_item->result_chain.result的数据类型
typedef struct PINT_Request_result {
//里面存放着一个数组,表示请求里面的子请求的offset
PVFS_offset *offset_array;/* array of offsets for each segment output */
//所有子请求的size
PVFS_size *size_array; /* array of sizes for each segment output */
int32_t segmax; /* maximum number of segments to output */
int32_t segs; /* number of segments output */
//最大输出的数量
PVFS_size bytemax; /* maximum number of bytes to output */
//输出数量的一个累计值
PVFS_size bytes; /* number of bytes output */
} PINT_Request_result;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
//cur_ctx初始化
cur_ctx->flow_desc.file_data.fsize = sm_p->u.io.dfile_size_array[cur_ctx->index];
cur_ctx->flow_desc.file_data.dist = attr->u.meta.dist;
cur_ctx->flow_desc.file_data.server_nr = cur_ctx->server_nr;
cur_ctx->flow_desc.file_data.server_ct = attr->u.meta.dfile_count;
cur_ctx->flow_desc.file_req = sm_p->u.io.file_req;
cur_ctx->flow_desc.file_req_offset = sm_p->u.io.file_req_offset;
cur_ctx->flow_desc.mem_req = sm_p->u.io.mem_req;
cur_ctx->flow_desc.tag = cur_ctx->session_tag;
cur_ctx->flow_desc.type = sm_p->u.io.flowproto_type;
cur_ctx->flow_desc.user_ptr = NULL;

if (sm_p->u.io.io_type == PVFS_IO_READ)
{
cur_ctx->flow_desc.file_data.extend_flag = 0;
cur_ctx->flow_desc.src.endpoint_id = BMI_ENDPOINT;
cur_ctx->flow_desc.src.u.bmi.address = cur_ctx->msg.svr_addr;
cur_ctx->flow_desc.dest.endpoint_id = MEM_ENDPOINT;
cur_ctx->flow_desc.dest.u.mem.buffer = sm_p->u.io.buffer;
}
else
{
assert(sm_p->u.io.io_type == PVFS_IO_WRITE);
cur_ctx->flow_desc.file_data.extend_flag = 1;
cur_ctx->flow_desc.src.endpoint_id = MEM_ENDPOINT;
cur_ctx->flow_desc.src.u.mem.buffer = sm_p->u.io.buffer;
cur_ctx->flow_desc.dest.endpoint_id = BMI_ENDPOINT;
//提供destination server的地址
cur_ctx->flow_desc.dest.u.bmi.address = cur_ctx->msg.svr_addr;
}

//job_flow 函数定义(src\io\job)
/* job_flow()
*
* posts a job to service a flow (where a flow is a complex I/O
* operation between two endpoints, which may be memory, disk, or
* network)
*
* returns 0 on success, 1 on immediate completion, and -errno on
* failure
*/
int job_flow(flow_descriptor * flow_d,
void *user_ptr,
job_aint status_user_tag,
job_status_s * out_status_p,
job_id_t * id,
job_context_id context_id,
int timeout_sec,
PVFS_hint hints)
{
struct job_desc *jd = NULL;
int ret = -1;

/* allocate job descriptor first */
//分配一个job
jd = alloc_job_desc(JOB_FLOW);
if (!jd)
{
out_status_p->error_code = -PVFS_ENOMEM;
return 1;
}
jd->hints = hints;
flow_d->hints = hints;
jd->job_user_ptr = user_ptr;
jd->u.flow.flow_d = flow_d;
jd->context_id = context_id;
jd->status_user_tag = status_user_tag;
flow_d->user_ptr = jd;
flow_d->callback = flow_callback;

/* post the flow */

//post出这个请求

ret = PINT_flow_post(flow_d);
if (ret < 0)
{
out_status_p->error_code = ret;
out_status_p->status_user_tag = status_user_tag;
dealloc_job_desc(jd);
jd = NULL;
return (1);
}
if (ret == 1)
{
/* immediate completion */
out_status_p->error_code = 0;
out_status_p->status_user_tag = status_user_tag;
out_status_p->actual_size = flow_d->total_transferred;
dealloc_job_desc(jd);
jd = NULL;
return (1);
}

/* queue up the job desc. for later completion */
*id = jd->job_id;
flow_pending_count++;
gossip_debug(GOSSIP_FLOW_DEBUG, "Job flows in progress (post time): %d\n",
flow_pending_count);

return(job_time_mgr_add(jd, timeout_sec));
}

PVFS2数据分布的有关函数指针。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/* Distribution functions that must be supplied by each dist implmentation */
typedef struct PINT_dist_methods_s
{
/* Returns the physical storage offset for a logical file offset */
PVFS_offset (*logical_to_physical_offset)(void* params,
PINT_request_file_data* rf_data,
PVFS_offset logical_offset);

/* Returns the logical file offset for a given physical storage offset */
PVFS_offset (*physical_to_logical_offset)(void* params,
PINT_request_file_data* rf_data,
PVFS_offset physical_offset);

/* Returns the next physical offset for the file on server_nr given an
* arbitraty logical offset (i.e. an offset anywhere in the file) */
PVFS_offset (*next_mapped_offset)(void* params,
PINT_request_file_data* rf_data,
PVFS_offset logical_offset);

/* Returns the contiguous length of file data starting at physical_offset*/
PVFS_size (*contiguous_length)(void* params,
PINT_request_file_data* rf_data,
PVFS_offset physical_offset);

/* Returns the logical file size */
PVFS_size (*logical_file_size)(void* params,
uint32_t num_handles,
PVFS_size *psizes);

/* Returns the number of data file objects to use for a file */
int (*get_num_dfiles)(void* params,
uint32_t num_servers_available,
uint32_t num_dfiles_requested);

/* Sets the parameter designated by name to the given value */
int (*set_param)(const char* dist_name, void* params,
const char* param_name, void* value);

/* Retrieves a blocksize value suitable to report in stat() */
PVFS_size (*get_blksize)(void* params);

/* Stores parameters in lebf memory at pptr */
void (*encode_lebf)(char **pptr, void* params);

/* Restores parameters in lebf memory at pptr */
void (*decode_lebf)(char **pptr, void* params);

/* Called when the distribution is registered */
void (*registration_init)(void* params);

char *(*params_string)(void *params);
} PINT_dist_methods;

PINT_flow_post() 是flow发送的函数。BMI_post_send_list()发送

服务端io_send_ack() 到客户端。之后客户端收到这个ack后,服务端客户端开始传输数据。

trove里面有三种模式:null aio(默认) directio

数据结构及函数指针的定义在trove-mgmt.c中。

貌似比较重要的几个文件dbpf-alt-aio.c,dbpf-bstream-aio.c,dbpf-bstream.c,dbpf-context.c,dbpf-collection.c,dbpf-mgmt.c
dbpf-op-queue.c,dbpf-op.c,dbpf-sync.c,dbpf-thread.c

dbpf-alt-aio.c包含两个数据结构的初始化,alt_lio_thread会被调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static struct dbpf_aio_ops alt_aio_ops =
{
alt_aio_read, /*这几个函数都是空函数*/
alt_aio_write,
alt_lio_listio, /*不是空函数,不清楚会不会被调用*/
alt_aio_error,
alt_aio_return,
alt_aio_cancel,
alt_aio_suspend,
alt_aio_fsync
};
struct TROVE_bstream_ops alt_aio_bstream_ops =
{
dbpf_bstream_read_at,
dbpf_bstream_write_at,
dbpf_bstream_resize,
dbpf_bstream_validate,
alt_aio_bstream_read_list, /*该函数被调用,调用dbpf_bstream_rw_list*/
alt_aio_bstream_write_list,
dbpf_bstream_flush,
NULL
};

dbpf-bstream-aio.c中包含函数:dbpf_bstream_listio_convert,会被调用。

dbpf-bstream.c中包含两个数据结构的初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static struct dbpf_aio_ops aio_ops =
{
aio_read,
aio_write,
lio_listio,
aio_error,
aio_return,
aio_cancel,
aio_suspend,
aio_fsync
};
struct TROVE_bstream_ops dbpf_bstream_ops =
{
dbpf_bstream_read_at,
dbpf_bstream_write_at,
dbpf_bstream_resize,
dbpf_bstream_validate,
dbpf_bstream_read_list, /*该函数没有被调用。调用dbpf_bstream_rw_list*/
dbpf_bstream_write_list,
dbpf_bstream_flush,
dbpf_bstream_cancel
};

dbpf-op中dbpf_queued_op_init被调用。

dbpf-sync中dbpf_sync_coalesce_enqueue被调用。