本文主要研究下Windows下的IO重叠技术。
何为IO重叠?按照它的定义,在同一个线程中,我们同时向多个目标socket发送数据或者从多个socket接收数据,而我们用于传递和接收的函数未等IO过程结束就返回,而数据的传递和接收工作完全交给了操作系统,在这个过程中针对目标socket的每个IO操作在时间片上都发生了重叠,这就是IO重叠,如下图所示:
使用IO重叠技术时,我们不能再使用普通的套接字,而是需要使用WSASocket函数创建支持IO重叠的套接字。WSASocket的原型如下:
SOCKET WSASocket( __in int af, __in int type, __in int protocol, __in LPWSAPROTOCOL_INFO lpProtocolInfo, __in GROUP g, __in DWORD dwFlags );
af —— 协议族,和普通socket函数一致。
type —— 协议类型,和普通socket函数一致。
protocol —— 协议,和普通的socket函数一致。
lpProtocolInfo —— 包含创建套接字信息的WSAPROTOCOL_INFO结构体变量地址值,一般情况下为NULL即可。
g —— 保留字段,为0即可。
dwFlags —— 套接字的属性,我们创建的支持IO叠加的属性就是在这里进行设置,设置成WSA_FLAG_OVERLAPPED。
创建完成支持IO重叠的套接字之后,中间套接字的bind、listen以及accept和connect和普通的socket一致,不同的是我们发送和接收数据时需要改用WSASend和WSARecv,下面就这两个函数进行介绍。
WSASend函数的原型如下:
int WSASend( __in SOCKET s, __in LPWSABUF lpBuffers, __in DWORD dwBufferCount, __out LPDWORD lpNumberOfBytesSent, __in DWORD dwFlags, __in LPWSAOVERLAPPED lpOverlapped, __in LPWSAOVERLAPPED_COMPLETION_ROUTINElpCompletionRoutine );
s —— 要发送数据的目标套接字。
lpBuffers—— WSABUF结构体指针。与普通的send函数不同,WSASend将要传递的数据封装在WSABUF类型的结构体中,其原型如下:
typedef struct__WSABUF { u_long len; char FAR *buf; } WSABUF,*LPWSABUF;
buf是我们要发送的数据数组,len是该数组的长度。lpBuffers是WSABUF类型的数组指针,也就是说通过封装,WSASend可以一次性发送多个buffer,而非send函数的每次只能发送一个。
dwBufferCount——lpBuffers数组的元素个数。
lpNumberOfBytesSent—— 实际发送的数据的字节数。这里可能部分童鞋会有疑问:既然WSASend发送数据时不会发生阻塞,会立即返回,为什么这里还能接收到实际发送的字节数?
这要从WSASend发送数据的两种情况说起:
一、如果发送的数据很少,或者输出缓冲区为空,WSASend发送数据的效率将会很高,此时还没等WSASend函数返回,其实数据已经发送完成,这是实际发送的字节数将会被存到lpNumberOfBytesSent所指的内存中。
二、如果发送的数据很多,WSASend函数返回时,数据还没发送成功,这时WSASend将会返回WSA_IO_PENDING错误,我们可以通过WSAGetLastError函数获取到这个错误。然后调用WSAGetOverlappedResult获取最终的传输的数据字节数。WSAGetOverlappedResult的原型如下:
BOOL WSAAPI WSAGetOverlappedResult( __in SOCKET s, __in LPWSAOVERLAPPED lpOverlapped, __out LPDWORD lpcbTransfer, __in BOOL fWait, __out LPDWORD lpdwFlags );
s是我们的目标socket。lpOverlapped是和socket关联的WSAOVERLAPPED类型变量的地址,稍后会解释这个结构体变量。lpcbTransfer用于存放保存实际传输字节变量的地址值。fWait为True时等待IO完成,为False时会直接返回False。lpdwFlags是调用WSARecv时用于获取附件信息,如OOB等一些通过socket优先信道传输的信息,一般为NULL即可。
dwFlags—— 用于更改数据的传输特性,如发送OOB模式的数据,传送数据时从优先信道传输额外的紧急度较高的附加数据,类似于高速公路的应急车道,一般情况下为0即可。
lpOverlapped—— 是一个WSAOVERLAPPED类型的结构体指针,WSAOVERLAPPED的定义如下:
typedef struct _WSAOVERLAPPED { ULONG_PTR Internal; ULONG_PTR InternalHigh; union { struct { DWORD Offset; DWORD OffsetHigh; } ; PVOID Pointer; } ; HANDLE hEvent; } WSAOVERLAPPED, *LPWSAOVERLAPPED;
它的前面四个成员我们不必关心,我们要关心的的它的hEvent成员,这是需要我们创建的non-signaled的auto-reset模式的事件句柄,基于事件的IO重叠模型中。如果完成IO时,这个事件所指的内核对象将被改变成signaled状态。我们在调用WSAGetOverlappedResult获取实际发送的字节之前需要先调用WSAWaitForMultipleEvents
等待IO结束,该事件的状态变成signaled。
lpCompletionRoutine—— 最后这个成员是一个函数指针,是用于确认IO完成状态的另一种方法,在本文设置成NULL即可。
上面就是WSASend函数所涉及到的相关知识,接下来介绍下WSARecv函数。
int WSARecv( __in SOCKET s, __inout LPWSABUF lpBuffers, __in DWORD dwBufferCount, __out LPDWORD lpNumberOfBytesRecvd, __inout LPDWORD lpFlags, __in LPWSAOVERLAPPED lpOverlapped, __in LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine );
WSARecv各个参数的含义和作用和WSASend基本一致,这里就不再赘述。下面是基于事件进行IO重叠的一个回声服务端的例子,稍后会对关键代码进行解释。
// WSASocketServ.cpp : 定?§义°?控?制?台??§应®|用®?程¨?序¨°的ì?入¨?口¨2点ì?。?ê // #include "stdafx.h" #include <winsock2.h> #include <stdio.h> #include <stdlib.h> #pragma comment(lib,"ws2_32.lib") #define BUF_SIZE 30 #define SOCKS_SIZE 64 void ErrorHandler(const char* message); void CompressSocks(SOCKET socks[],int pos,int size); void CompressOverlappeds(WSAOVERLAPPED peds[],intpos,int size); int _tmain(int argc, _TCHAR* argv[]) { WSADatawsaData; WSAStartup(MAKEWORD(2,2),&wsaData); SOCKETservSock,clntSock; SOCKADDR_INservAddr,clntAddr; int clntAddrSz; char buf[BUF_SIZE]; SOCKETsocks[SOCKS_SIZE]; HANDLEevents[SOCKS_SIZE]; unsigned long ul=1; int clntSockIndex; WSABUFwsaBuf; DWORDrecvBytes,sendBytes,flags=0; WSAOVERLAPPEDrdOverLappeds[SOCKS_SIZE]; WSAOVERLAPPEDwrOverLappeds[SOCKS_SIZE]; servSock=WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED); if(servSock==INVALID_SOCKET) ErrorHandler("WSASocket Error"); memset(&servAddr,0,sizeof(servAddr)); servAddr.sin_family=AF_INET; servAddr.sin_addr.s_addr=htonl(INADDR_ANY); servAddr.sin_port=htons(atoi("8888")); if(bind(servSock,(SOCKADDR*)&servAddr,sizeof(servAddr))==SOCKET_ERROR) ErrorHandler("bind error"); if(listen(servSock,5)==SOCKET_ERROR) ErrorHandler("listen error"); //设|¨¨置?socket为a非¤?阻ᨨ塞¨?模?ê式o? ioctlsocket(servSock,FIONBIO,(unsigned long*)&ul); clntSockIndex= 0; while(1) { clntAddrSz=sizeof(clntAddr); clntSock=accept(servSock,(SOCKADDR*)&clntAddr,&clntAddrSz); if(clntSock!=INVALID_SOCKET) { socks[clntSockIndex]=clntSock; //存??储??é用®?于®¨2读¨¢的ì?WSAOVERLAPPED变à?量¢? HANDLEhRdEvent=WSACreateEvent(); WSAOVERLAPPEDrdOverLapped; memset(&rdOverLapped,0,sizeof(rdOverLapped)); rdOverLapped.hEvent=hRdEvent; rdOverLappeds[clntSockIndex]=rdOverLapped; //存??储??é用®?于®¨2写??的ì?WSAOVERLAPPED变à?量¢? HANDLEhWrEvent=WSACreateEvent(); WSAOVERLAPPEDwrOverLapped; memset(&wrOverLapped,0,sizeof(wrOverLapped)); wrOverLapped.hEvent=hWrEvent; wrOverLappeds[clntSockIndex]=wrOverLapped; clntSockIndex+= 1; } for (int i=0;i<clntsockindex;i++) {="" memset(&wsabuf,0,sizeof(wsabuf));="" wsabuf.buf="buf;" wsabuf.len="BUF_SIZE;" if(wsarecv(socks[i],&wsabuf,1,&recvbytes,&flags,&rdoverlappeds[i],null)="=SOCKET_ERROR)" if(wsagetlasterror()="=WSA_IO_PENDING)" puts("backgroud="" data="" recv");="" 当ì?à有®d客¨a户?ì端?连¢?接¨®后¨®,ê?下?面?这a句?代?¨2码?会¨¢发¤?é生|¨2阻ᨨ塞¨?,ê?这a个?服¤t务?端?一°?次??只?能¨1和¨a一°?个?客¨a户?ì端?进?行d会¨¢话??="" wsawaitformultipleevents(1,&rdoverlappeds[i].hevent,true,wsa_infinite,false);="" wsagetoverlappedresult(socks[i],&rdoverlappeds[i],&recvbytes,false,null);="" }="" else="" puts("wsagetlasterror="" error");="" continue;="" if(recvbytes="=0)" closesocket(socks[i]);="" compresssocks(socks,i,socks_size);="" compressoverlappeds(wroverlappeds,i,socks_size);="" compressoverlappeds(rdoverlappeds,i,socks_size);="" clntsockindex--;="" i--;="" if(wsasend(socks[i],&wsabuf,1,&sendbytes,flags,&wroverlappeds[i],null)="=SOCKET_ERROR)" send");="" wsawaitformultipleevents(1,&wroverlappeds[i].hevent,true,wsa_infinite,false);="" wsagetoverlappedresult(socks[i],&wroverlappeds[i],&sendbytes,false,null);="" closesocket(servsock);="" wsacleanup();="" return="" 0;="" void="" compresssocks(socket="" socks[],int="" pos,int="" size)="" while(pos
第38行,调用WSASocket创建支持IO重叠的socket。
第54行,由于不需要再accept时进行阻塞,所以要把套接字改成非阻塞模式。
第63~68行,创建用于WSARecv的事件、WSAOVERLAPPED对象并把它放到数组中。
第70~74行,创建用于WSASend的事件、WSAOVERLAPPED对象并把它放到数组中。
第82~83行,将buf赋值给wsaBuf,用于在WSARecv中接收数据。
第85行,调用WSARecv接收客户端传来的数据。
第87~92行,由于WSAGetLastError返回了WSA_IO_PENDING,说明数据还没有接收完成,调用WSAWaitForMultipleEvents等待数据接收完成,事件所指内核变成singnaled状态。然后调用WSAGetOverlappedResult获取实际接收的字节数,此时接收的数据将会被存放到buf中。
第110~111行,将buf和实际接收的字节数再次赋值给wsaBuf,用于在WSASend中发送。
第112行,调用WSASend向客户端发送buf中的数据。
第114~118行,由于WSAGetLastError返回了WSA_IO_PENDING,说明数据还没有发送完成,调用WSAWaitForMultipleEvents等待数据发送完成,事件所指内核变成singnaled状态。然后调用WSAGetOverlappedResult获取实际发送的字节数。
Github位置: https://github.com/HymanLiuTS/NetDevelopment 克隆本项目: gitclonegit@github.com:HymanLiuTS/NetDevelopment.git 获取本文源代码: gitcheckout NL54