linux下IO应用详解

一. 对于I/O的概述

Linux磁盘I/O分为Buffered IO和Direct IO,这两者有何区别?

Buffered IO:
当应用程序尝试读取某块数据的时候,如果这块数据已经存放在了页缓存(page cache)中,那么这块数据就可以立即返回给应用程序,而不需要经过实际的物理读盘操作。当然,如果数据在应用程序读取之前并未被存放在页缓存中,那么就需要先将数据从磁盘读到页缓存中去。对于写操作来说,应用程序也会将数据先写到页缓存中去,数据是否被立即写到磁盘上去取决于应用程序所采用的写操作机制:如果用户采用的是同步写机制( synchronous writes ),那么数据会立即被写回到磁盘上,应用程序会一直等到数据被写完为止;如果用户采用的是延迟写机制( deferred writes ),那么应用程序就完全不需要等到数据全部被写回到磁盘,数据只要被写到页缓存中去就可以了。在延迟写机制的情况下,操作系统会定期地将放在页缓存中的数据刷到磁盘上。与异步写机制( asynchronous writes )不同的是,延迟写机制在数据完全写到磁盘上的时候不会通知应用程序,而异步写机制在数据完全写到磁盘上的时候是会返回给应用程序的。所以延迟写机制本身是存在数据丢失的风险的,而异步写机制则不会有这方面的担心。

总结下,Buffered IO的特点是使用了内存缓存,如:

  • 读操作:硬盘->内核页缓存->用户缓冲区
  • 写操作:用户缓冲区->内核页缓存->硬盘

对Buffered IO,数据在传输过程中需要在应用程序地址空间和页缓存之间进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。

对于某些特殊的应用程序(如数据库)来说,避开操作系统内核缓冲区而直接在应用程序地址空间和磁盘之间传输数据会比使用操作系统内核缓冲区获取更好的性能。

Direct IO:
Direct-io的目的在于绕过文件系统(ext)的cache,直接对block设备上的文件进行读写。但不经内核缓冲区,直接写磁盘,必然会引起阻塞。所以通常DIRECT-io与AIO(异步IO)会一起出现。

二. I/O模型的类比

Linux下的I/O模型。

如图每个 I/O 模型都有自己的使用模式,它们对于特定的应用程序都有自己的优点。下面一一来介绍:

2.1 同步阻塞 I/O 模型

在这个模型中,用户空间的应用程序执行一个系统调用,这会导致应用程序阻塞。这意味着应用程序会一直阻塞,直到系统调用完成为止(数据传输完成或发生错误)。调用应用程序处于一种不再消费 CPU 而只是简单等待响应的状态,因此从处理的角度来看,这是非常有效的。

2.2 同步非阻塞 I/O

同步阻塞 I/O 的一种效率稍低的变种是同步非阻塞 I/O。在这种模型中,设备是以非阻塞的形式打开的。这意味着 I/O 操作不会立即完成,read操作可能会返回一个错误代码,说明这个命令不能立即满足(EAGAIN 或 EWOULDBLOCK)

2.3 异步阻塞 I/O

IO多路复用(复用的select线程)。I/O复用模型会用到select、poll、epoll函数,这几个函数也会使进程阻塞,但是和阻塞I/O所不同的的,这两个函数可以同时阻塞多个I/O操作。而且可以同时对多个读操作,多个写操作的I/O函数进行检测,直到有数据可读或可写时,才真正调用I/O操作函数。对于每个提示符来说,我们可以获取这个描述符可以写数据、有读数据可用以及是否发生错误的通知。

epoll总结:

epoll支持水平触发和边缘触发,最大的特点在于边缘触发,它只告诉进程哪些fd刚刚变为就需态,并且只会通知一次。还有一个特点是,epoll使用“事件”的就绪通知方式,通过epoll_ctl注册fd,一旦该fd就绪,内核就会采用类似callback的回调机制来激活该fd,epoll_wait便可以收到通知.

epoll的优点:

  • 没有最大并发连接的限制,能打开的FD的上限远大于1024(1G的内存上能监听约10万个端口)。

  • 效率提升,不是轮询的方式,只管你“活跃”的连接,不会随着FD数目的增加效率下降。只有活跃可用的FD才会调用callback函数。

  • 内存拷贝,利用mmap()文件映射内存加速与内核空间的消息传递;即epoll使用mmap减少复制开销。

表面上看epoll的性能最好,但是在连接数少并且连接都十分活跃的情况下,select和poll的性能可能比epoll好,毕竟epoll的通知机制需要很多函数回调。

2.4 异步非阻塞 I/O(AIO)

异步非阻塞 I/O 模型是一种CPU处理与 I/O 重叠进行的模型。读请求会立即返回,说明 read 请求已经成功发起了。在后台完成读操作时,应用程序然后会执行其他处理操作。当 read 的响应到达时,就会产生一个信号或执行一个基于线程的回调函数来完成这次 I/O 处理过程。

三. AIO编程介绍:

aio异步读写是在Linux内核2.6之后才正式纳入其标准。之所以会增加此模块,是因为众所周知我们计算机CPU的执行速度远大于I/O读写的执行速度,如果我们用传统的阻塞式或非阻塞式来操作I/O的话,那么我们在同一个程序中(不用多线程或多进程)就不能同时操作俩个以上的文件I/O,每次只能对一个文件进行I/O操作,很明显这样效率很低下(因为CPU速度远大于I/O操作的速度,所以当执行I/O时,CPU其实还可以做更多的事)。因此就诞生了相对高效的异步I/O。
在 Linux 系统上有三种方式来实现 AIO:

  • 内核系统调用
  • 对内核系统调用进行封装进而在用户空间提供服务,例如 libaio
  • 完全在用户空间实现 AIO,并且不使用内核支持,例如librt和部分libc

3.1 内核系统调用

内核提供的API有io_setup, io_submit, io_getevents, io_destroy,
所使用的参数是 aio_context_t 而非libaio中的 io_context_t
一般不采用此方式直接调用,一般使用的是libaio库进行调用。

3.2 libaio

libaio提供的API有:io_setup, io_submit, io_getevents, io_destroy

使用时需要独立安装相关的 libaio-devel 开发库
yum install libaio-devel

3.2.1 相关结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
struct iocb {        // 描述IO请求

void *data; /* Return in the io completion event */
unsigned key; /*r use in identifying io requests */
short aio_lio_opcode;  // 操作的类型:IO_CMD_PWRITE | IO_CMD_PREAD
short aio_reqprio;
int aio_fildes;    // 操作的文件fd
union {
struct io_iocb_common c;
struct io_iocb_vector v;
struct io_iocb_poll poll;
struct io_iocb_sockaddr saddr;
} u;
};

struct io_iocb_common {  
void *buf;  
unsigned long nbytes;
long long offset;
unsigned flags;
unsigned resfd;
};

  struct io_event {    // 描述返回结果
    void *data;
    struct iocb *obj;  // 提交的任务
    unsigned long res;  // IO任务完成的状态
   unsigned long res2;  // 同上
};

3.2.2 应用步骤

3.2.2.1 建立IO任务
1
int io_setup (int maxevents, io_context_t *ctxp);

io_context_t对应内核中一个结构,为异步IO请求提供上下文环境。
注意: 在调用 io_setup 前必须将 io_context_t 初始化为0。
也需要open打开文件的时候设置 O_DIRECT 标志。

3.2.2.2 提交IO任务
1
long io_submit (io_context_t ctx_id, long nr, struct iocb **iocbpp);

提交任务之前必须先填充iocb结构体,libaio提供的包装函数说明了需要完成的工作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void io_prep_pread(struct iocb *iocb, int fd, void *buf, size_t count, long long offset)
{
  memset(iocb, 0, sizeof(*iocb));
  iocb->aio_fildes = fd;
  iocb->aio_lio_opcode = IO_CMD_PREAD;
  iocb->aio_reqprio = 0;
  iocb->u.c.buf = buf;
  iocb->u.c.nbytes = count;
  iocb->u.c.offset = offset;
}

void io_prep_pwrite(struct iocb *iocb, int fd, void *buf, size_t count, long long offset)
{
  memset(iocb, 0, sizeof(*iocb));
  iocb->aio_fildes = fd;
  iocb->aio_lio_opcode = IO_CMD_PWRITE;
  iocb->aio_reqprio = 0;
  iocb->u.c.buf = buf;
  iocb->u.c.nbytes = count;
  iocb->u.c.offset = offset;
}

这里注意读写的buf都必须是按扇区对齐的,可以用posix_memalign来分配。

详细代码内容可参考 https://pagure.io/libaio/blob/master/f/src/libaio.h

3.2.2.3 获取完成的IO

io_getevents 系统调用尝试从 ctx_id 指定的AIO上下文的完成队列中读取至少min_nr, 最多nr个事件,
timeout指定等待IO完成的超时时间,为NULL时一直等待到至少看到min_nr个事件为止;

1
long io_getevents (io_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout);

这里最重要的就是提供一个io_event数组给内核来copy完成的IO请求到这里,数组的大小是io_setup时指定的maxevents。

3.2.2.4 销毁IO任务
1
int io_destroy (io_context_t ctx);

3.2.3 libaio和epoll的结合

在异步编程中,任何一个环节的阻塞都会导致整个程序的阻塞,所以一定要避免在 io_getevents 调用时阻塞式的等待。还记得 io_iocb_common 中的 flags 和 resfd 吗?看看libaio是如何提供io_getevents和事件循环的结合:

1
2
3
4
void io_set_eventfd(struct iocb *iocb, int eventfd) {
iocb->u.c.flags |= (1 << 0) /* IOCB_FLAG_RESFD */;
iocb->u.c.resfd = eventfd;
}

这里的resfd是通过系统调用eventfd生成的。
int eventfd(unsigned int initval, int flags);
eventfd 函数是linux 2.6.22内核之后加进来的syscall,作用是内核用来通知应用程序发生的事件的数量,从而使应用程序不用频繁地去轮询内核是否有时间发生,而是有内核将发生事件的数量写入到该fd,应用程序发现fd可读后,从fd读取该数值,并马上去内核读取。

有了eventfd,就可以很好地将libaio和epoll事件循环结合起来:

1. 创建一个eventfd
    efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);

2. 将eventfd设置到iocb中, 将eventfd和iocb进行关联
    io_set_eventfd(iocb, efd);

3. 交接AIO请求
    io_submit(ctx, NUM_EVENTS, iocb);

4. 创建一个epollfd,并将eventfd加到epoll中
    epfd = epoll_create(1);
    epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent);
    epoll_wait(epfd, &epevent, 1, -1);

5. 当 `eventfd` 可读时,从 `eventfd` 读出完成IO请求的数量,并调用 `io_getevents` 获取这些IO
    read(efd, &finished_aio, sizeof(finished_aio);
    r = io_getevents(ctx, 1, NUM_EVENTS, events, &tms);

3.2.4 epoll/aio/eventfd结合使用的简单例子

make时需要增加-laio 库引用
g++ main.cpp -o aiolib -laio

main.cpp内容如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#include <stdio.h>
#include <errno.h>
#include <libaio.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdint.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <inttypes.h>

#define TEST_FILE "aio_test_file"
#define TEST_FILE_SIZE (127 * 1024)
#define NUM_EVENTS 128
#define ALIGN_SIZE 512
#define RD_WR_SIZE 1024

struct custom_iocb {
struct iocb iocb;
int nth_request;
};

// aio 完成后的回调函数
void aio_callback(io_context_t ctx, struct iocb *iocb, long res, long res2) {
struct custom_iocb *iocbp = (struct custom_iocb *)iocb;
printf("nth_request: %d, request_type: %s, offset: %lld, length: %lu, res: %ld, res2: %ld\n",
iocbp->nth_request, (iocb->aio_lio_opcode == IO_CMD_PREAD) ? "READ" : "WRITE",
iocb->u.c.offset, iocb->u.c.nbytes, res, res2);
}

/**
* eventfd 被epoll触发后的事件处理
* @param ctx io操作上下文
* @param efd 被epoll触发的eventfd
* @return >0 返回完成的事件个数 <0 失败
*/
static int epoll_eventfd_hanlder(io_context_t ctx, int efd) {
struct io_event events[64]; //事件列表, 每次最多处理64个
struct timespec tms;
uint64_t finished_aio = 0;
if (read(efd, &finished_aio, sizeof(finished_aio)) != sizeof(finished_aio)) { // 读取当前完成的aio读取事件个数
perror("read");
return -1;
}
int iret = (int)finished_aio;

printf("finished io number: %ld\n", finished_aio);
while (finished_aio > 0) { // 循环处理所有已完成aio事件
tms.tv_sec = 0;
tms.tv_nsec = 0;
int r = io_getevents(ctx, 1, 64, events, &tms); // 每次最多处理64个任务
if (r > 0) {
printf("@@@ get io_event num=%d,\n", r);
for (int j=0; j<r; ++j) {
((io_callback_t)(events[j].data))(ctx, events[j].obj, events[j].res, events[j].res2);
}
finished_aio -= r;
}
}
return iret;
}

int main(int argc, char *argv[]) {
int efd=-1, fd=-1, epfd=-1;
io_context_t ctx;
struct custom_iocb iocbs[NUM_EVENTS];
struct iocb *iocbps[NUM_EVENTS];
struct custom_iocb *iocbp;
int i, j;
void *buf;
struct epoll_event epevent;

if ( 0>(efd=eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC)) ) {
perror("eventfd");
return 2;
}

if (0>(fd = open(TEST_FILE, O_RDWR | O_CREAT | O_DIRECT, 0644))) { //注意添加O_DIRECT
perror("open");
return 3;
}
ftruncate(fd, TEST_FILE_SIZE);

memset(&ctx, 0, sizeof(ctx)); // 注意必须初始化为0

if (io_setup(8192, &ctx)) {
perror("io_setup");
return 4;
}

if (posix_memalign(&buf, ALIGN_SIZE, RD_WR_SIZE)) { // 申请内存对齐的空间,类似malloc 需调用free释放空间
perror("posix_memalign");
return 5;
}
printf("buf: %p\n", buf);

for (i = 0, iocbp = iocbs; i < NUM_EVENTS; ++i, ++iocbp) {
iocbps[i] = &iocbp->iocb;
io_prep_pread(&iocbp->iocb, fd, buf, RD_WR_SIZE, i * RD_WR_SIZE);
io_set_eventfd(&iocbp->iocb, efd);
io_set_callback(&iocbp->iocb, aio_callback);
iocbp->nth_request = i + 1;
}

if (io_submit(ctx, NUM_EVENTS, iocbps) != NUM_EVENTS) {
perror("io_submit");
return 6;
}

if (0>(epfd = epoll_create(1))) {
perror("epoll_create");
return 7;
}

epevent.events = EPOLLIN | EPOLLET;
epevent.data.ptr = NULL;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent)) {
perror("epoll_ctl");
return 8;
}

i = 0;
while (i < NUM_EVENTS) { //如果是服务端,此处应该一直循环
if (1 != epoll_wait(epfd, &epevent, 1, -1)) {
perror("epoll_wait");
return 9;
}
int iok = 0;
if(0<(iok=epoll_eventfd_hanlder(ctx, efd))) {
i += iok;
} else {
return 10;
}
}

close(epfd);
free(buf);
io_destroy(ctx);
close(fd);
close(efd);
remove(TEST_FILE);
return 0;
}

3.3 POSIX AIO

当前的Linux POSIX AIO由glibc在用户空间中实现。POSIX AIO 有许多限制,最值得注意的是,维护多个线程来执行I/O操作是昂贵的,并且可扩展性很差。在基于内核状态机的 AIO 实现上已经进行了一段时间的工作,但是这个实现还没有成熟到可以使用内核系统调用完全重新实现POSIX AIO实现的程度。

3.3.1 函数说明

API函数 函数说明
aio_read 异步读操作
aio_write 异步写操作
aio_error 检查异步请求的状态
aio_return 获得异步请求完成时的返回值
aio_suspend 挂起调用进程,直到一个或多个异步请求已完成
aio_cancel 取消异步请求
lio_list 发起一系列异步I/O请求

3.3.2 库

1
librt libc 

linux内核中aiocd结构体原型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <aiocb.h>

struct aiocb {
/* The order of these fields is implementation-dependent */

int aio_fildes; /* File descriptor */
off_t aio_offset; /* File offset */
volatile void *aio_buf; /* Location of buffer */
size_t aio_nbytes; /* Length of transfer */
int aio_reqprio; /* Request priority */
struct sigevent aio_sigevent; /* Notification method */
int aio_lio_opcode; /* Operation to be performed;
lio_listio() only */

/* Various implementation-internal fields not shown */
};
/* Operation codes for 'aio_lio_opcode': */
enum { LIO_READ, LIO_WRITE, LIO_NOP };
  • aio_filedes 要执行io操作的文件描述符
  • aio_offset 要执行io操作的文件的偏移量
  • aio_buf 读写传输数据的缓冲区空间
  • aio_nbytes 指定的aio_buf缓冲区的大小
  • aio_reqprio 该字段指定从调用线程的实时优先级中减去的值,以确定执行此I / O请求的优先级(请参阅pthread_setschedparam(3))。 指定的值必须在0到sysconf(_SC_AIO_PRIO_DELTA_MAX)返回的值之间。 对于文件同步操作,将忽略此字段。
  • aio_sigevent 指定异步io操作完成时的通知方式, aio_sigevent.sigev_notify 的值可能是SIGEV_NONE, SIGEV_SIGNAL和 SIGEV_THREAD,相信信息可参考 sigevent(7)
  • aio_lio_opcode 要执行的操作类型; 仅用于lio_listio

3.3.3 API详细介绍及实例:

1. aio_read()

aio_read 函数请求对一个有效的文件描述符进行异步读操作。这个文件描述符可以表示一个文件、套接字甚至管道。aio_read 函数的原型如下:

int aio_read(struct aiocb *aiocbp);

aio_read 函数在请求进行排队之后会立即返回。如果执行成功,返回值就为 0;如果出现错误,返回值就为 -1,并设置 errno 的值。

2. aio_error()

aio_error 函数被用来确定请求的状态。

int aio_error( struct aiocb *aiocbp );

返回值:
EINPROGRESS,说明请求尚未完成

ECANCELLED,说明请求被应用程序取消了
-1,说明发生了错误,具体错误原因可以查阅 errno
0 ,说明完成当前请求

3. aio_return()

异步 I/O 和标准块 I/O 之间的另外一个区别是我们不能立即访问这个函数的返回状态,因为我们并没有阻塞在 read 调用上。在标准的 read 调用中,返回状态是在该函数返回时提供的。但是在异步 I/O 中,我们要使用 aio_return 函数。这个函数的原型如下:

ssize_t aio_return( struct aiocb *aiocbp );

对以上三个API的源码实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<assert.h>
#include<unistd.h>
#include<stdlib.h>
#include<errno.h>
#include<string.h>
#include<sys/types.h>
#include<fcntl.h>
#include<aio.h>

#define BUFFER_SIZE 1024
int MAX_LIST = 2;
int main(int argc,char **argv){
struct aiocb rd; //aio操作所需结构体
int fd,ret,couter;

fd = open("test.txt",O_RDONLY, 0644);
if(fd < 0) {
perror("test.txt");
}
//将rd结构体清空
bzero(&rd,sizeof(rd));

rd.aio_buf = malloc(BUFFER_SIZE + 1); //为rd.aio_buf分配空间
//填充rd结构体
rd.aio_fildes = fd;
rd.aio_nbytes = BUFFER_SIZE;
rd.aio_offset = 0;

ret = aio_read(&rd); //进行异步读操作
if(ret < 0) {
perror("aio_read");
exit(1);
}
//do other things

couter = 0;
// 循环等待异步读操作结束
while(aio_error(&rd) == EINPROGRESS) { //一直不停的查询(消耗cpu)
printf("第%d次\n",++couter);
}

//获取异步读返回值
ret = aio_return(&rd);
printf("\n\n返回值为:%d\n",ret);
printf("%s\n",rd.aio_buf);
free((char*)rd.aio_buf);
close(fd);
return 0;
}
4. aio_write()

aio_write 函数用来请求一个异步写操作。其函数原型如下:

intaio_write( struct aiocb *aiocbp );

aio_write 函数会立即返回,说明请求已经进行排队(成功时返回值为 0,失败时返回值为 -1,并相应地设置 errno)。

这与 read 系统调用类似,但是有一点不一样的行为需要注意。回想一下对于 read 调用来说,要使用的偏移量是非常重要的。然而,对于 write 来说,这个偏移量只有在没有设置 O_APPEND 选项的文件上下文中才会非常重要。如果设置了 O_APPEND,那么这个偏移量就会被忽略,数据都会被附加到文件的末尾。否则,aio_offset 域就确定了数据在要写入的文件中的偏移量.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#include<stdio.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<assert.h>
#include<unistd.h>
#include<stdlib.h>
#include<errno.h>
#include<string.h>
#include<sys/types.h>
#include<fcntl.h>
#include<aio.h>

#define BUFFER_SIZE 1024

int main(int argc,char **argv) {
//定义aio控制块结构体
struct aiocb wr;
int ret,fd;
char str[20] = {"hello,world"};
bzero(&wr,sizeof(wr));//置零wr结构体

fd = open("test.txt",O_WRONLY | O_APPEND, 0644);
if(fd < 0) {
perror("test.txt");
}

//为aio.buf申请空间
wr.aio_buf = (char *)malloc(BUFFER_SIZE);
if(wr.aio_buf == NULL) {
perror("buf");
}

wr.aio_buf = str;

//填充aiocb结构
wr.aio_fildes = fd;
wr.aio_nbytes = 1024;

ret = aio_write(&wr); //异步写操作
if(ret < 0) {
perror("aio_write");
}

//等待异步写完成
while(aio_error(&wr) == EINPROGRESS){
printf("hello,world\n");
}

//获得异步写的返回值
ret = aio_return(&wr);
printf("\n\n\n返回值为:%d\n",ret);
free((char*)wr.aio_buf);
close(fd);
return 0;
}
5. aio_suspend()
  • aio_suspend函数可以时当前进程挂起,直到有向其注册的异步事件完成为止
  • 阻塞
  • 当有AIO请求返程后,该函数返回

int aio_suspend(const struct aiocb *const cblist[],int n,const struct timespec *timeout);

第一个参数是个保存了aiocb块地址的数组,我们可以向其内添加想要等待阻塞的异步事件,第二个参数为向cblist注册的aiocb个数,第三个参数为等待阻塞的超时时间,NULL为无限等待.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include<stdio.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<assert.h>
#include<unistd.h>
#include<stdlib.h>
#include<errno.h>
#include<string.h>
#include<sys/types.h>
#include<fcntl.h>
#include<aio.h>

#define BUFFER_SIZE 1024

int MAX_LIST = 2;

int main(int argc,char **argv) {
//aio操作所需结构体
struct aiocb rd;
struct aiocb wd;
int fd,ret,couter;

struct aiocb *aiocb_list[2];//cblist链表

fd = open("test.txt",O_RDONLY, 0644);
if(fd < 0) {
perror("test.txt");
}

bzero(&rd,sizeof(rd));//将rd结构体清空

rd.aio_buf = malloc(BUFFER_SIZE + 1);//为rd.aio_buf分配空间

//填充rd结构体
rd.aio_fildes = fd;
rd.aio_nbytes = BUFFER_SIZE;
rd.aio_offset = 0;

//将读fd的事件注册
aiocb_list[0] = &rd;

//进行异步读操作
ret = aio_read(&rd);
if(ret < 0) {
perror("aio_read");
exit(1);
}

printf("我要开始等待异步读事件完成\n");
ret = aio_suspend(aiocb_list,MAX_LIST,NULL); //阻塞等待异步读事件完成
ret = aio_return(&rd);//获取异步读返回值
printf("\n\n返回值为:%d\n",ret);
free((char*)rd.aio_buf);
close(fd);
return 0;
}
6.lio_listio()

aio同时还为我们提供了一个可以发起多个或多种I/O请求的接口lio_listio

这个函数效率很高,因为我们只需一次系统调用(一次内核上下位切换)就可以完成大量的I/O操作

int lio_listio(int mode,struct aiocb *list[],int nent,struct sigevent *sig);

第一个参数mode可以有两个实参,LIO_WAITLIO_NOWAIT,前一个会阻塞该调用直到所有I/O都完成为止,后一个则会挂入队列就返回

  • LIO_WAIT 阻塞发起
  • LIO_NOWAIT 非阻塞发起

    批量发起AIO的两种方法

  • 阻塞等到所有发起的AIO全部完成后,才会返回
  • 发起后立即返回,通过绑定的信号来通知

LIO_WAIT demo::

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#include<stdio.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<assert.h>
#include<unistd.h>
#include<stdlib.h>
#include<errno.h>
#include<string.h>
#include<sys/types.h>
#include<fcntl.h>
#include<aio.h>

#define BUFFER_SIZE 1025
int MAX_LIST = 2;
int main(int argc,char **argv)
{
struct aiocb *listio[2];
struct aiocb rd,wr;
int fd,ret;

//异步读事件
fd = open("test1.txt",O_RDONLY, 0644);
if(fd < 0) {
perror("test1.txt");
}

bzero(&rd,sizeof(rd));

rd.aio_buf = (char *)malloc(BUFFER_SIZE);
if(rd.aio_buf == NULL) {
perror("aio_buf");
}

rd.aio_fildes = fd;
rd.aio_nbytes = 1024;
rd.aio_offset = 0;
rd.aio_lio_opcode = LIO_READ; ///lio操作类型为异步读

listio[0] = &rd;//将异步读事件添加到list中

fd = open("test2.txt",O_WRONLY | O_APPEND, 0644); //异步些事件
if(fd < 0) {
perror("test2.txt");
}

bzero(&wr,sizeof(wr));

wr.aio_buf = (char *)malloc(BUFFER_SIZE);
if(wr.aio_buf == NULL) {
perror("aio_buf");
}

wr.aio_fildes = fd;
wr.aio_nbytes = 1024;
wr.aio_lio_opcode = LIO_WRITE; ///lio操作类型为异步写

//将异步写事件添加到list中
listio[1] = &wr;

ret = lio_listio(LIO_WAIT,listio,MAX_LIST,NULL); //使用lio_listio发起一系列请求
ret = aio_return(&rd); //当异步读写都完成时获取他们的返回值
printf("\n读返回值:%d",ret);

ret = aio_return(&wr);
printf("\n写返回值:%d",ret);
free((char*)wr.aio_buf);
free((char*)rd.aio_buf);
close(rd.aio_fildes);
close(wr.aio_fildes);
return 0;
}

LIO_NOWAIT demo::

当我们的异步IO操作完成之时,我们可以通过信号通知我们的进程也可用回调函数来进行异步通知,接下来我会为大家主要介绍以下回调函数来进行异步通知

1
2
3
4
5
6
7
struct sigevent {
int sigev_notify; //Notification type.
int sigev_signo; //Signal number.
union sigval sigev_value; //Signal value.
void (*sigev_notify_function)(union sigval); //Notification function.
pthread_attr_t *sigev_notify_attributes; //Notification attributes.
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include<stdio.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<assert.h>
#include<unistd.h>
#include<stdlib.h>
#include<errno.h>
#include<string.h>
#include<sys/types.h>
#include<fcntl.h>
#include<aio.h>
#include<unistd.h>

#define BUFFER_SIZE 1025

void aio_completion_handler(sigval_t sigval)
{
//用来获取读aiocb结构的指针
struct aiocb *prd;
int ret;

prd = (struct aiocb *)sigval.sival_ptr;
printf("hello\n");
//获取返回值
ret = aio_return(prd);
free(prd->aio_buf);
close(prd->aio_fildes);
}

int main(int argc,char **argv)
{
int fd,ret;
struct aiocb rd;

fd = open("test.txt",O_RDONLY);
if(fd < 0) {
perror("test.txt");
}

bzero(&rd,sizeof(rd));//填充aiocb的基本内容

rd.aio_fildes = fd;
rd.aio_buf = (char *)malloc(sizeof(BUFFER_SIZE + 1));
rd.aio_nbytes = BUFFER_SIZE;
rd.aio_offset = 0;

//填充aiocb中有关回调通知的结构体sigevent
rd.aio_sigevent.sigev_notify = SIGEV_THREAD;//使用线程回调通知
rd.aio_sigevent.sigev_notify_function = aio_completion_handler;//设置回调函数
rd.aio_sigevent.sigev_notify_attributes = NULL;//使用默认属性
rd.aio_sigevent.sigev_value.sival_ptr = &rd;//在aiocb控制块中加入自己的引用

//异步读取文件
ret = aio_read(&rd);
if(ret < 0) {
perror("aio_read");
}

printf("异步读以开始\n");
sleep(1);
printf("异步读结束\n");
return 0;
}

四. 参考

https://www.ibm.com/developerworks/cn/linux/l-async/index.html
https://blog.csdn.net/abraham_1/article/details/79824350
https://man7.org/linux/man-pages/man7/aio.7.html
http://guleilab.com/2019/01/30/linux-aio/
https://www.cnblogs.com/chenny7/p/4362910.html
https://pagure.io/libaio/blob/master/f/src