Linux的I/O机制经历了几个阶段,包括:
1 .同时块I/O:用户进程执行I/O操作,直到I/O操作完成。
2 .同步无阻塞I/O:用户程序可以通过设置文件描述符的属性O_NONBLOCK来立即恢复I/O操作,但不能保证I/O操作成功。
3 .异步事件阻止I/O:用户进程可以阻止I/O事件,但不阻止I/O操作。 通过函数调用(如select/poll/epoll )实现此目的。
4 .异步时间异步阻塞I/O:也称为异步I/O(AIO ),用户程序通过向内核发出I/O请求命令,可以继续进行其他操作,而无需等待I/O事件的实际发生这将大大提高系统的吞吐量。
图1显示了同步和异步模型,以及块和非块模型。
图1 .基本Linux I/O模型的简单矩阵
在linux下有一个aio包。 aio_*系列的调用由glibc提供。 这是glibc在线程上阻止和模拟调用的结果,性能很差。 可以使用更低级别的libaio来控制更多的io行为。
libaio项目:http://OSS.Oracle.com/projects/libaio-Oracle /
使用libaio并不复杂,流程是初始化libaio、发出和回收io请求、销毁libaio
一、libaio接口
libaio有五个主要的API函数:
intio_setup(intmaxevents,io_context_t *ctxp );
intio_destroy(io_context_tctx;
intio_submit(io_context_tCTX,long nr,struct iocb *ios[];
intio_cancel(io_context_tCTX,struct iocb *iocb,struct io_event *evt );
intio _ get events (io _ context _ tctx _ id,long min_nr,long nr,struct io_event *events,structtimespec*time
定义五个宏:
void io _ set _ callback (结构iocb * iocb,io_callback_t cb );
void io _ prep _ pwrite (struct iocb * iocb,int fd,void *buf,size_t count,long long offset );
void io _ prep _ pread (struct iocb * iocb,int fd,void *buf,size_t count,long long offset );
void io _ prep _ pwritev (struct iocb * iocb,int fd,const struct iovec *iov,int iovcnt,long long offset );
void io _ prep _ preadv (struct iocb * iocb,int fd,const struct iovec *iov,int iovcnt,long long offset );
这5个宏观定义都是操作struct iocb的结构体。 struct IOcb是libaio的重要结构,表示io,但其结构稍复杂,不建议直接操作该元素以保持封装性,因此在上面的五个宏中定义操作。
二、libaio的初始化和销毁
观察libaio的五个主要API时,使用了io_context类型的变量。 此变量是libaio的工作区。 具体地说,你不需要知道这个变量的结构,只需要知道它的操作就可以了。 分别使用io_setup(io_queue_init )创建和销毁libaio。 区别只是名字不同。 )和io_destroy。
intio_setup(intmaxevents,io_context_t *ctxp );
这里也需要open需要操作的文件。 请注意O_DIRECT标志的设置
intio_destroy(io_context_tctx;三、libaio读写请求的下发和回收
1 .请求提交
libaio的读写请求都是用io_submit发出的。 发布前用io_prep_pwrite和io_prep_pread生成iocb的结构体,作为io_submit的参数。 此结构指定了读写类型、开始扇区、长度和设备标记。
libaio的初始化不是针对特定设备,而是创建libaio的工作环境。 读写请求发送到哪个设备由在open函数中打开的设备标记指定。
2 .寻求返回
发出读写请求后,使用io_getevents函数等待io终止信号:
int io
_getevents(io_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout);io_getevents返回events的数组,其参数events为数组首地址,nr为数组长度(即最大返回的event数),min_nr为最少返回的events数。timeout可填NULL表示无等待超时。io_event结构体的声明为:
struct io_event {
PADDEDptr(void *data, __pad1);
PADDEDptr(struct iocb *obj, __pad2);
PADDEDul(res, __pad3);
PADDEDul(res2, __pad4);
};
其中,res为实际完成的字节数;res2为读写成功状态,0表示成功;
obj为之前下发的struct iocb结构体。这里有必要了解一下struct iocb这个结构体的主要内容:
iocbp->iocb.u.c.nbytes 字节数
iocbp->iocb.u.c.offset 偏移
iocbp->iocb.u.c.buf 缓冲空间
iocbp->iocb.u.c.flags 读写
3. 自定义字段
struct iocb除了自带的元素外,还留有供用户自定义的元素,包括回调函数和void *的data指针。如果在请求下发前用io_set_callback绑定用户自定义的回调函数,那么请求返回后就可以显示的调用该函数。回调函数的类型为:
void callback_function(io_context_t ctx, struct iocb *iocb, long res, long res2);
另外,还可以通过iocbp->data指针挂上用户自己的数据。
注意:实际使用中发现回调函数和data指针不能同时用,可能回调函数本身就是使用的data指针
#include <stdlib.h>
#include <stdio.h>
#include <libaio.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <libaio.h>
int srcfd=-1;
int odsfd=-1;
#define AIO_BLKSIZE 1024
#define AIO_MAXIO 64
static void wr_done(io_context_t ctx, struct iocb *iocb, long res, long res2)
{
if(res2 != 0)
{
printf("aio write errorn");
}
if(res != iocb->u.c.nbytes)
{
printf( "write missed bytes expect %d got %dn", iocb->u.c.nbytes, res);
exit(1);
}
free(iocb->u.c.buf);
free(iocb);
}
static void rd_done(io_context_t ctx, struct iocb *iocb, long res, long res2)
{
/*library needs accessors to look at iocb*/
int iosize = iocb->u.c.nbytes;
char *buf = (char *)iocb->u.c.buf;
off_t offset = iocb->u.c.offset;
int tmp;
char *wrbuff = NULL;
if(res2 != 0)
{
printf("aio readn");
}
if(res != iosize)
{
printf( "read missing bytes expect %d got %d", iocb->u.c.nbytes, res);
exit(1);
}
/*turn read into write*/
tmp = posix_memalign((void **)&wrbuff, getpagesize(), AIO_BLKSIZE);
if(tmp < 0)
{
printf("posix_memalign222n");
exit(1);
}
snprintf(wrbuff, iosize + 1, "%s", buf);
printf("wrbuff-len = %d:%sn", strlen(wrbuff), wrbuff);
printf("wrbuff_len = %dn", strlen(wrbuff));
free(buf);
io_prep_pwrite(iocb, odsfd, wrbuff, iosize, offset);
io_set_callback(iocb, wr_done);
if(1!= (res=io_submit(ctx, 1, &iocb)))
printf("io_submit write errorn");
printf("nsubmit %d write requestn", res);
}
void main(int args,void * argv[])
{
int length = sizeof("abcdefg");
char * content = (char * )malloc(length);
io_context_t myctx;
int rc;
char * buff=NULL;
int offset=0;
int num,i,tmp;
if(args<3)
{
printf("the number of param is wrongn");
exit(1);
}
if((srcfd=open(argv[1],O_RDWR))<0)
{
printf("open srcfile errorn");
exit(1);
}
printf("srcfd=%dn",srcfd);
lseek(srcfd,0,SEEK_SET);
write(srcfd,"abcdefg",length);
lseek(srcfd,0,SEEK_SET);
read(srcfd,content,length);
printf("write in the srcfile successful,content is %sn",content);
if((odsfd=open(argv[2],O_RDWR))<0)
{
close(srcfd);
printf("open odsfile errorn");
exit(1);
}
memset(&myctx, 0, sizeof(myctx));
io_queue_init(AIO_MAXIO, &myctx);
struct iocb *io = (struct iocb*)malloc(sizeof(struct iocb));
int iosize = AIO_BLKSIZE;
tmp = posix_memalign((void **)&buff, getpagesize(), AIO_BLKSIZE);
if(tmp < 0)
{
printf("posix_memalign errorn");
exit(1);
}
if(NULL == io)
{
printf( "io out of memeoryn");
exit(1);
}
io_prep_pread(io, srcfd, buff, iosize, offset);
io_set_callback(io, rd_done);
printf("START鈥nn");
rc = io_submit(myctx, 1, &io);
if(rc < 0)
printf("io_submit read errorn");
printf("nsubmit %d read requestn", rc);
//m_io_queue_run(myctx);
struct io_event events[AIO_MAXIO];
io_callback_t cb;
num = io_getevents(myctx, 1, AIO_MAXIO, events, NULL);
printf("n%d io_request completednn", num);
for(i=0;i<num;i++)
{
cb = (io_callback_t)events[i].data;
struct iocb *io = events[i].obj;
printf("events[%d].data = %x, res = %d, res2 = %dn", i, cb, events[i].res, events[i].res2);
cb(myctx, io, events[i].res, events[i].res2);
}
}