频道栏目
首页 > 程序开发 > 综合编程 > 其他综合 > 正文
6-muduo网络库简介
2016-11-14 09:36:00         来源:gaoxiangnumber1's Blog  
收藏   我要投稿

Welcome to my github: https://github.com/gaoxiangnumber1

6.4 使用教程

muduo支持并发非阻塞TCP网络编程,每个IO线程是一个事件循环,把IO事件分发到回调函数上。

6.4.1 TCP网络编程本质论

传统模式:主动调用recv(2)接收数据,主动调用accept(2)接受新连接,主动调用send(2)发送数据。 基于事件的非阻塞网络编程:
-1- 注册一个收数据的回调,网络库收到数据会调用我,直接把数据提供给我。
-2- 注册一个接受连接的回调,网络库接受新连接会回调我,直接把新的连接对象传给我。
-3- 需要发送数据时,只管往连接中写,网络库会负责无阻塞地发送。 TCP网络编程的本质是处理三个半事件:
连接的建立,包括服务端接受(accept)新连接和客户端成功发起(connect)连接。TCP连接一旦建立,客户端和服务端是平等的,可以各自收发数据。 连接的断开,包括主动断开(close、shut-down)和被动断开(read(2)返回0)。 消息到达,文件描述符可读。这是最重要的事件,对它的处理方式决定了网络编程的风格(阻塞/非阻塞,如何处理分包,应用层的缓冲如何设计等等)。
3.5 消息发送完毕,这算半个事件。“发送完毕”是指将数据写入操作系统的缓冲区,将由TCP协议栈负责数据的发送与重传,不代表对方已经收到数据。低流量的服务可以不关心这个事件。

其中的难点

如果要主动关闭连接,如何保证对方已经收到全部数据? 如果应用层有缓冲(这在非阻塞网络编程中是必需的),如何保证先发送完缓冲区中的数据,然后再断开连接? 如果主动发起连接,但对方主动拒绝,如何定期(带back-off地)重试? 非阻塞网络编程该用边沿触发(edge trigger)还是电平触发(level trigger)?如果是电平触发,那么什么时候关注EPOLLOUT事件?会不会造成busy-loop?如果是边沿触发,如何防止漏读造成的饥饿?epoll(4)一定比poll(2)快吗?

非阻塞网络编程中,为什么要使用应用层发送缓冲区?

假设应用程序需要发送40kB数据,但是操作系统的TCP发送缓冲区只有25kB剩余空间,那么剩下的15kB数据怎么办?如果等待OS缓冲区可用,会阻塞当前线程,因为不知道对方什么时候收到并读取数据。因此网络库应该把这15kB数据缓存起来,放到这个TCP连接的应用层发送缓冲区中,等socket变成可写的时候立刻发送数据,这样“发送”操作不会阻塞。 如果应用程序随后又要发送50kB数据,而此时发送缓冲区中尚有未发送的数据(若干kB),那么网络库应该将这50kB数据追加到发送缓冲区的末尾,而不能立刻尝试write(),因为这样有可能打乱数据的顺序。

非阻塞网络编程中,为什么要使用应用层接收缓冲区?

假如一次读到的数据不够一个完整的数据包,那么这些已经读到的数据应该先暂存在某个地方,等剩余的数据收到之后再一并处理。 假如数据是一个字节一个字节地到达,间隔10ms,每个字节触发一次文件描述符可读(readable)事件,程序不能正常工作。

非阻塞网络编程中,如何设计并使用缓冲区?

一方面,希望减少系统调用,一次读的数据越多越划算,那么应该准备一个大的缓冲区。
另一方面,希望减少内存占用。如果有10000个并发连接,每个连接一建立就分配各50kB的读写缓冲区(s),将占用1GB内存,而大多数时候这些缓冲区的使用率很低。 muduo用readv(2)结合栈上空间解决了这个问题。

6.4.2 echo服务的实现

muduo的使用只需要注册回调函数去处理前面提到的三个半事件。以echo服务为例。 定义EchoServer class,不需要派生自任何基类。
//examples/simple/echo/echo.h
#include 

class EchoServer
{
public:
    EchoServer(muduo::net::EventLoop *loop,
               const muduo::net::InetAddress &listen_address);

    void Start(); // call server_.start()

private:
    void OnConnection(const muduo::net::TcpConnectionPtr &connection);

    void OnMessage(const muduo::net::TcpConnectionPtr &connection,
                   muduo::net::Buffer *buffer,
                   muduo::Timestamp time);

    muduo::net::TcpServer server_;
};
在构造函数里注册回调函数。
//examples/simple/echo/echo.cc
EchoServer::EchoServer(EventLoop *loop,
                       const InetAddress &listen_address)
    : server_(loop, listen_address, "EchoServer")
{
    server_.setConnectionCallback(bind(&EchoServer::OnConnection, this, _1));
    server_.setMessageCallback(bind(&EchoServer::OnMessage, this, _1, _2, _3));
}
实现EchoServer::onConnection()和EchoServer::onMessage()
//examples/simple/echo/echo.cc
void EchoServer::OnConnection(const TcpConnectionPtr &connection)
{
    LOG_INFO << "EchoServer - " << connection->peerAddress().toIpPort() << " -> "
             << connection->localAddress().toIpPort() << " is "
             << (connection->connected() ? "UP" : "DOWN");
}

void EchoServer::OnMessage(const TcpConnectionPtr &connection,
                           Buffer* buffer,
                           Timestamp time)
{
    string message(buffer->retrieveAllAsString()); // L37
    LOG_INFO << connection->name() << " echo " << message.size() << " bytes, "
             << "data received at " << time.toString();
    connection->send(message); // L40
}
L37和L40是echo的“业务逻辑”:把收到的数据原封不动地发回客户端。不用担心L40的send(msg)是否完整地发送数据,因为网络库会帮我们管理发送缓冲区。 这两个函数体现了“基于事件编程”的典型做法:程序主体是被动等待事件发生,事件发生之后网络库会回调事先注册的事件处理函数(event handler)。
在onConnection()函数中,conn参数是TcpConnection对象的shared_ptr;TcpConnection::connected()返回bool,表明目前连接是建立还是断开;TcpConnection的peerAddress()和localAddress()成员函数分别返回对方和本地的地址(以InetAddress对象表示的IP和port)。 在onMessage()函数中,conn参数是收到数据的那个TCP连接;buf是已经收到的数据,buf的数据会累积,直到用户从中取走(retrieve)数据。buf是指针,表明用户代码可以修改buffer;time是收到数据的确切时间,即epoll_wait(2)返回的时间,这个时间通常比read(2)发生的时间略早,可以用于正确测量程序的消息处理延迟。Timestamp对象采用pass-by-value,而不是pass-by-const-reference,因为在x86-64上可以直接通过寄存器传参。 在main()里用EventLoop让整个程序跑起来。
//examples/simple/echo/main.cc
#include "echo.h"
#include 
#include 

using muduo::net::EventLoop;
using muduo::net::InetAddress;

int main()
{
    LOG_INFO << "pid = " << getpid();
    EventLoop loop;
    InetAddress listen_address(7188);
    EchoServer server(&loop, listen_address);
    server.start();
    loop.loop();
}
这个程序实现了一个单线程并发的echo server,可以同时处理多个连接。

6.6详解muduo多线程模型

6.6.1数独求解服务器

Sudoku Solver可以看成是echo服务的一个变种。挑战在于怎样做才能发挥现在多核硬件的能力?先写一个基本的单线程版。

协议

一个简单的以\r\n分隔的文本行协议,使用TCP长连接,客户端在不需要服务时主动断开连接。
请求:[id:]<81digits>\r\n
响应:[id:]<81digits>\r\n
或者:[id:]NoSolution\r\n
[id:]表示可选的id,用于区分先后的请求,以支持Parallel Pipelining,响应中会回显请求中的id。Parallel Pipelining的意义见《以小见大——那些基于Protobuf的五花八门的RPC(2)》【P157-26】,或《分布式系统的工程化开发方法》【P157-27】第54页关于out-of-order RPC的介绍。 <81digits>是Sudoku的棋盘,9×9个数字,从左上角到右下角按行扫描,未知数字以0表示。如果Sudoku有解,那么响应是填满数字的棋盘;如果无解,则返回NoSolution。
例1请求:000000010400000000020000000000050407008000300001090000300400200050100000000806000\r\n
响应:693784512487512936125963874932651487568247391741398625319475268856129743274836159\r\n
例2请求:a:000000010400000000020000000000050407008000300001090000300400200050100000000806000\r\n
响应:a:693784512487512936125963874932651487568247391741398625319475268856129743274836159\r\n
例3请求:b:000000010400000000020000000000050407008000300001090000300400200050100000000806005\r\n
响应:
b:NoSolution\r\n
Sudoku求解算法见《谈谈数独(Sudoku)》【158-28】。假设已有一个函数能求解Sudoku,原型如下:
string solveSudoku(const string &puzzle);
函数的输入是“<81digits>”,输出是“<81digits>”或“NoSolution”。这个函数是个pure function,也是线程安全的。
//examples/sudoku/server_basic.cc
const int kCells = 81; // Cells' number.
void OnMessage(const TcpConnectionPtr &connection, Buffer* buffer, Timestamp)
{
    LOG_DEBUG << connection->name();
    size_t len = buffer->readableBytes();
    while(len >= kCells + 2) // Read request data repeatedly. 2 stands for CRLF(\r\n)
    {
        const char* crlf = buffer->findCRLF();
        if(crlf) // If found a complete request.
        {
            string request(buffer->peek(), crlf); // Copy request data.
            buffer->retrieveUntil(crlf + 2); // Retrieve data that has been read.
            len = buffer->readableBytes(); // Update buffer's length
            if(ProcessRequest(connection, request) == false) // Illegal request, close the connection.
            {
                connection->send("Bad Request!\r\n");
                connection->shutdown();
                break;
            }
        }
        else // Incomplete request.
        {
            break;
        }
    }
}

bool ProcessRequest(const TcpConnectionPtr &connection, const string &request)
{
    string id;
    string puzzle;
    bool good_request = true;

    string::const_iterator colon = find(request.begin(), request.end(), ':');
    if(colon != request.end()) // If found the `id` part.
    {
        id.assign(request.begin(), colon);
        puzzle.assign(colon+1, request.end());
    }
    else
    {
        puzzle = request;
    }

    if(puzzle.size() == const_cast(kCells)) // Request's length is legal.
    {
        LOG_DEBUG << connection->name();
        string result = SolveSudoku(puzzle); // Get answer at here.
        if(id.empty())
        {
            connection->send(result+"\r\n");
        }
        else
        {
            connection->send(id+":"+result+"\r\n");
        }
    }
    else
    {
        good_request = false;
    }
    return good_request;
}
onMessage()的主要功能是处理协议格式,并调用solveSudoku()求解问题。这个函数应该能正确处理TCP分包。 server_basic.cc是一个并发服务器,可以同时服务多个客户连接。但它是单线程的,无法发挥多核硬件的能力。

6.6.2常见的并发网络服务程序设计方案

POSA2总结了可伸缩网络编程,以下文章也值得参考。 http://bulk.fefe.de/scalable-networking.pdf
http://www.kegel.com/c10k.html
http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

\

表6-1是12种常见方案。
“互通”是指如果开发chat服务,多个客户连接之间是否能方便地交换数据。 “顺序性”是指在httpd/Sudoku这类请求响应服务中,如果客户连接顺序发送多个请求,那么计算得到的多个响应是否按相同的顺序发还给客户(这里指在自然条件下,不含刻意同步)。 UNP CSDA(Client-Server Design Alternatives)方案归入0-5。方案5是目前用得很多的单线程Reactor方案。
方案6和方案7不是实用的方案,只是过渡品。

方案0:accept+read/write

这不是并发服务器,而是iterative服务器,因为它一次只能服务一个客户。代码见[UNP] Figure 1.9。这个方案不适合长连接,适合daytime这种write-only短连接服务。
recipes/python/echo-iterative.py
 1  #!/usr/bin/python
 2  
 3  import socket
 4  
 5  def handle(client_socket, client_address):
 6      while True:
 7          data = client_socket.recv(4096)
 8          if data:
 9              sent = client_socket.send(data)
10          else:
11              print "disconnect", client_address
12              client_socket.close()
13              break
14  
15  if __name__ == "__main__":
16      listen_address = ("0.0.0.0", 2007)
17      server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
18      server_socket.bind(listen_address)
19      server_socket.listen(5)
20  
21      while True:
22          (client_socket, client_address) = server_socket.accept()
23          print "got connection from", client_address
24          handle(client_socket, client_address)
L6-L13是echo服务的“业务逻辑循环”。
L21-L24说明它一次只能服务一个客户连接。后面列举的方案都是在保持这个循环的功能不变的情况下,设法高效地同时服务多个客户端。

方案1:accept+fork

process-per-connection。
适合并发连接数不大并且计算响应的工作量远大于fork()的开销的情况,如数据库服务器。 适合长连接,不适合短连接,因为fork()时间长、开销大。
recipes/python/echo-fork.py
 1  #!/usr/bin/python
 2  
 3  from SocketServer import BaseRequestHandler, TCPServer
 4  from SocketServer import ForkingTCPServer, ThreadingTCPServer
 5  
 6  class EchoHandler(BaseRequestHandler):
 7      def handle(self):
 8          print "got connection from", self.client_address
 9          while True:
10              data = self.request.recv(4096)
11              if data:
12                  sent = self.request.send(data)
13              else:
14                  print "disconnect", self.client_address
15                  self.request.close()
16                  break
17  
18  if __name__ == "__main__":
19      listen_address = ("0.0.0.0", 2007)
20      server = ForkingTCPServer(listen_address, EchoHandler)
21      server.serve_forever()
L9-L16是业务逻辑循环,self.request代替前面的client_socket。ForkingTCPServer会对每个客户连接新建一个子进程,在子进程中调用EchoHandler.handle(),从而同时服务多个客户端。

方案2:accept+thread

thread-per-connection初始化开销比方案1小很多,但仍不适合短连接服务。
该方案的伸缩性受到线程数的限制,几千个的话对操作系统的scheduler是个不小的负担。 只改动了一行代码。ThreadingTCPServer会对每个客户连接新建一个线程,在该线程中调用EchoHandler.handle()。
$ diff -U2 echo-fork.py echo-thread.py 
 if __name__ == "__main__":
     listen_address = ("0.0.0.0", 2007)
-    server = ForkingTCPServer(listen_address, EchoHandler)
+    server = ThreadingTCPServer(listen_address, EchoHandler)
     server.serve_forever()
这里体现了将“并发策略”与“业务逻辑”EchoHandler.handle()分离的思路。用同样的思路重写方案0的代码:
$ diff -U2 echo-fork.py echo-single.py 
 if __name__ == "__main__":
     listen_address = ("0.0.0.0", 2007)
-    server = ForkingTCPServer(listen_address, EchoHandler)
+    server = TCPServer(listen_address, EchoHandler)
     server.serve_forever()

方案3:prefork

这是对方案1的优化,[UNP]分析了几种变化,包括对accept(2)惊群问题thundering herd的考虑。

方案4:pre threaded

这是对方案2的优化,[UNP]分析了几种变化。方案3和方案4都是Apache httpd长期使用的方案。 以上方案都是阻塞式网络编程,程序流程(thread of control)通常阻塞在read()上,等待数据到达。但是TCP是全双工协议,同时支持read()和write(),当一个线程/进程阻塞在read()上,但程序又想给这个TCP连接发数据,怎么办?比如echo client,既要从stdin读,又要从网络读,当程序正在阻塞地读网络的时候,如何处理键盘输入? 一种方法是用两个线程/进程,一个负责读,一个负责写。7.13-Python双线程TCP relay的例子,另见Python Pinhole http://code.activestate.com/recipes/114642/。 另一种方法是IO multiplexing,即select/poll/epoll/kqueue等的“多路选择器”,让一个thread of control能处理多个连接。“IO复用”其实复用的不是IO连接,而是复用线程。使用select/poll几乎肯定要配合non-blocking IO;而使用non-blocking IO肯定要使用应用层buffer,原因见§7.4。如果每个程序都做一套自己的IO multiplexing机制(本质是event-driven事件驱动),这是一种很大的浪费。Reactor模式让event-driven网络编程有章可循。
recipes/python/echo-poll.py
 6  server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 7  server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 8  server_socket.bind(('', 2007))
 9  server_socket.listen(5)
10  # server_socket.setblocking(0)
11  poll = select.poll() # epoll() should work the same
12  poll.register(server_socket.fileno(), select.POLLIN)
13  
14  connections = {}
15  while True:
16      events = poll.poll(10000)  # 10 seconds
17      for fileno, event in events:
18          if fileno == server_socket.fileno():
19              (client_socket, client_address) = server_socket.accept()
20              print "got connection from", client_address
21              # client_socket.setblocking(0)
22              poll.register(client_socket.fileno(), select.POLLIN)
23              connections[client_socket.fileno()] = client_socket
24          elif event & select.POLLIN:
25              client_socket = connections[fileno]
26              data = client_socket.recv(4096)
27              if data:
28                  client_socket.send(data) # sendall() partial?
29              else:
30                  poll.unregister(fileno)
31                  client_socket.close()
32                  del connections[fileno]
该代码没开启non-blocking,没考虑数据发送不完整(L28)等情况。
L14定义一个从文件描述符到socket对象的映射。 L15-L32是一个事件循环,每当有IO事件发生时,就针对不同的文件描述符fileno执行不同的操作L16、L17。 L18-L23对于listening fd,接受(accept)新连接,并注册到IO事件关注列表(watch list),然后把连接添加到connections字典中。 L24-L32对于客户连接,读取并回显数据,并处理连接的关闭。
对于echo服务,业务逻辑只有L28:将收到的数据原样发回客户端。 以上代码不是功能完善的IO multiplexing范本,它没有考虑错误处理,没有实现定时功能,只适合listen一个端口的网络服务程序。如果需要侦听多个端口,或者要同时扮演客户端,那么代码的结构需要推倒重来。 这个代码框架可用于实现多种TCP服务器。例如写一个聊天服务只需改动3行代码,如下所示。业务逻辑是L28-L30:将本连接收到的数据转发给其他客户连接,但是这种把业务逻辑隐藏在一个循环中的做法不利于功能扩展。
$ diff echo-poll.py chat-poll.py -U4
--- echo-poll.py    2015-04-02 15:36:58.000000000 +0800
+++ chat-poll.py    2015-04-02 15:36:58.000000000 +0800
@@ -24,9 +24,11 @@
23          elif event & select.POLLIN:
24              client_socket = connections[fileno]
25              data = client_socket.recv(4096)
26              if data:
27 -                client_socket.send(data) # sendall() partial?
28 +                for (fd, other_socket) in connections.iteritems():
29 +                    if other_socket != client_socket:
30 +                        other_socket.send(data) # sendall() partial?
31              else:
32                  poll.unregister(fileno)
33                  client_socket.close()
34                  del connections[fileno]
Reactor模式的主要思想:网络编程中有很多是事务性(routine)的工作,可以提取为公用的框架/库,用户只需要填上关键的业务逻辑代码,并将回调注册到框架中,就能实现完整的网络服务。
Reactor模式的意义:将消息(IO事件)分发到用户提供的处理函数,并保持网络部分的通用代码不变,独立于用户的业务逻辑。

\

图6-11左图为单线程Reactor的执行顺序。Reactor事件循环所在的线程称为IO线程:通常由网络库负责读写socket,用户代码负责解码、计算、编码。
没有事件时,线程等待在select/poll/epoll_wait等函数上。 事件到达后由网络库处理IO,再把消息通知(即回调)客户端代码。 因为图6-11只有一个线程,所以事件是顺序处理的,一个线程同时只能做一件事。在协作式多任务中,事件的优先级得不到保证,因为从“poll返回后”到“下一次调用poll进入等待之前”这段时间内,线程不会被其他连接上的数据或事件抢占(图6-11右图)。如果想延迟计算(把compute()推迟100ms),那么不能用sleep()之类的阻塞调用,而应该注册超时回调,以避免阻塞当前IO线程。

方案5:poll(reactor)

基本的单线程Reactor方案(图6-11),即server_basic.cc。
优点:由网络库完成数据收发,程序只关心业务逻辑。
缺点:适合IO密集的应用,不适合CPU密集的应用,因为难发挥多核的威力。 与方案2相比,方案5处理网络消息的延迟略大。因为方案2一次read(2)系统调用就能拿到请求数据,而方案5要先poll(2)再read(2),多了一次系统调用。
recipes/python/echo-reactor.py
 6  server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 7  server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 8  server_socket.bind(('', 2007))
 9  server_socket.listen(5)
10  # serversocket.setblocking(0)
11  
12  poll = select.poll() # epoll() should work the same
13  connections = {}
14  handlers = {}
15  
16  def handle_input(socket, data):
17      socket.send(data) # sendall() partial?
18  
19  def handle_request(fileno, event):
20      if event & select.POLLIN:
21          client_socket = connections[fileno]
22          data = client_socket.recv(4096)
23          if data:
24              handle_input(client_socket, data)
25          else:
26              poll.unregister(fileno)
27              client_socket.close()
28              del connections[fileno]
29              del handlers[fileno]
30  
31  def handle_accept(fileno, event):
32      (client_socket, client_address) = server_socket.accept()
33      print "got connection from", client_address
34      # client_socket.setblocking(0)
35      poll.register(client_socket.fileno(), select.POLLIN)
36      connections[client_socket.fileno()] = client_socket
37      handlers[client_socket.fileno()] = handle_request
38  
39  poll.register(server_socket.fileno(), select.POLLIN)
40  handlers[server_socket.fileno()] = handle_accept
41  
42  while True:
43      events = poll.poll(10000)  # 10 seconds
44      for fileno, event in events:
45          handler = handlers[fileno]
46          handler(fileno, event)
程序核心是事件循环L42-L46。与前面不同的是,事件的处理通过handlers转发到各个函数中,不再集中在一起。如listening fd的处理函数是handle_accept,它会注册客户连接的handler。普通客户连接的处理函数是handle_request,其中又把连接断开和数据到达(由handle_input处理)这两个事件分开。业务逻辑位于单独的handle_input函数,实现了分离。 如果要改成聊天服务,重新定义handle_input函数即可,程序的其余部分保持不变。
$ diff -U1 echo-reactor.py chat-reactor.py
 def handle_input(socket, data):
-    socket.send(data) # sendall() partial?
+    for (fd, other_socket) in connections.iteritems():
+        if other_socket != socket:
+            other_socket.send(data) # sendall() partial?
在使用非阻塞IO+事件驱动方式编程的时候,一定要注意避免在事件回调中执行耗时的操作,包括阻塞IO等,否则会影响程序的响应。

方案6:reactor + thread-per-task

这是一个过渡方案,收到Sudoku请求后,不在Reactor线程计算,而是创建一个新线程去计算,以充分利用多核CPU。为每个请求(而不是每个连接)创建一个新线程的开销可以用线程池来避免,即方案8。 该方案还有一个特点是out-of-order,即同时创建多个线程去计算同一个连接上收到的多个请求,那么算出结果的次序是不确定的,可能后到的简单请求比先到的复杂请求先算出结果。这也是设计协议时使用id的原因,以便客户端区分response对应的是哪个request。

方案7:reactor + worker thread

为了让返回结果的顺序确定,可以为每个连接创建一个计算线程,每个连接上的请求固定发给同一个线程去算,先到先得。这也是一个过渡方案,因为并发连接数受限于线程数目,这个方案或许还不如直接使用阻塞IO的方案2 thread-per-connection。 方案7与6的另一区别是单个client的最大CPU占用率。
在方案6中,一个TCP连接上发来的一长串突发请求(burst requests)可以占满全部8个core。 在方案7中,由于每个连接上的请求固定由同一个线程处理,那么它最多占用12.5%的CPU。
使用哪种方案取决于应用场景的需要:公平性重要还是突发性能重要。这个区别在方案8和9中同样存在,需要根据应用来取舍。

方案8:reactor + thread pool

\

图6-12:使用固定大小的线程池弥补方案6中为每个请求创建线程的缺陷。全部的IO工作都在一个Reactor线程完成,计算任务交给thread pool。如果计算任务彼此独立,且IO压力不大,那么这种方案非常适用。Sudoku Solver正好符合。代码见:examples/sudoku/server_threadpool.cc。
bool ProcessRequest(const TcpConnectionPtr &connection, const string &request)
{
    string id;
    string puzzle;
    bool goodRequest = true;

    string::const_iterator colon = find(request.begin(), request.end(), ':');
    if (colon != request.end())
    {
        id.assign(request.begin(), colon);
        puzzle.assign(colon+1, request.end());
    }
    else
    {
        puzzle = request;
    }

    if (puzzle.size() == static_cast(kCells))
    {
        thread_pool_.run(bind(&solve, connection, puzzle, id));
    }
    else
    {
        goodRequest = false;
    }
    return goodRequest;
}

static void solve(const TcpConnectionPtr &connection,
                  const string &puzzle,
                  const string &id)
{
    LOG_DEBUG << connection->name();
    string result = SolveSudoku(puzzle);
    if (id.empty())
    {
        connection->send(result+"\r\n");
    }
    else
    {
        connection->send(id+":"+result+"\r\n");
    }
}
方案8使用线程池的代码与单线程Reactor的方案5相比,把原来onMessage()中涉及计算和发回响应的部分抽出来做成solve函数,然后交给ThreadPool去计算。方案8有乱序返回的可能,客户端要根据id来匹配响应。 线程池的另一个作用是执行阻塞操作。比如有的数据库的客户端只提供同步访问,那么可以把数据库查询放到线程池中,可以避免阻塞IO线程,不会影响其他客户连接。另外也可以用线程池来调用一些阻塞的IO函数。 如果IO的压力大,一个Reactor处理不过来,可以使用方案9,它采用多个Reactor来分担负载。

方案9:reactors in threads

这是muduo内置的多线程方案,特点是one loop per thread,有一个main Reactor负责accept(2)连接,然后把连接挂在某个sub Reactor中(muduo采用round-robin算法来选择sub Reactor),这样该连接的所有操作都在那个sub Reactor所处的线程中完成。多个连接可能被分派到多个线程中,以充分利用CPU。 muduo采用的是固定大小的Reactor pool,池子的大小根据CPU数目确定,即线程数固定,这样程序的总体处理能力不会随连接数增加而下降。由于一个连接完全由一个线程管理,那么请求的顺序性有保证,突发请求也不会占满全部8个核(如果需要优化突发请求,可以考虑方案11)。这种方案把IO分派给多个线程,防止出现一个Reactor的处理能力饱和。 与方案8的线程池相比,方案9减少了进出thread pool的两次上下文切换,在把多个连接分散到多个Reactor线程之后,小规模计算可以在当前IO线程完成并发回结果,从而降低响应的延迟。

\

图6-13。代码examples/sudoku/server_multiloop.cc,它与server_basic.cc的区别很小,最关键的只有一行代码:server_.setThreadNum(numThreads);
$ diff -u server_basic.cc server_multiloop.cc 
--- server_basic.cc 2016-09-28 01:02:54.000000000 +0800
+++ server_multiloop.cc 2016-09-28 01:02:54.000000000 +0800
@@ -20,18 +20,21 @@
 class SudokuServer
 {
  public:
-  SudokuServer(EventLoop* loop, const InetAddress& listenAddr)
+  SudokuServer(EventLoop* loop, const InetAddress& listenAddr, int numThreads)
     : server_(loop, listenAddr, "SudokuServer"),
+      numThreads_(numThreads),
       startTime_(Timestamp::now())
   {
     server_.setConnectionCallback(
         boost::bind(&SudokuServer::onConnection, this, _1));
     server_.setMessageCallback(
         boost::bind(&SudokuServer::onMessage, this, _1, _2, _3));
+    server_.setThreadNum(numThreads);
   }

   void start()
   {
+    LOG_INFO << "starting " << numThreads_ << " threads.";
     server_.start();
   }

@@ -113,15 +116,21 @@
   }

   TcpServer server_;
+  int numThreads_;
   Timestamp startTime_;
 };

 int main(int argc, char* argv[])
 {
   LOG_INFO << "pid = " << getpid() << ", tid = " << CurrentThread::tid();
+  int numThreads = 0;
+  if (argc > 1)
+  {
+    numThreads = atoi(argv[1]);
+  }
   EventLoop loop;
   InetAddress listenAddr(9981);
-  SudokuServer server(&loop, listenAddr);
+  SudokuServer server(&loop, listenAddr, numThreads);

   server.start();

方案10:reactors in processes

这是Nginx的内置方案。如果连接之间无交互,这种方案也很好。工作进程之间相互独立,可以热升级。

方案11

把方案8和9混合,既使用多个Reactor来处理IO,又使用线程池来处理计算。适合既有突发IO(利用多线程处理多个连接上的IO),又有突发计算的应用(利用线程池把一个连接上的计算任务分配给多个线程去做)。

\

图6-14。把方案8的代码加一行server_.setThreadNum(numThreads);就成为该方案的代码。 一个程序应该使用几个event loop?
ZeroMQ手册给出的建议:按照每千兆比特每秒的吞吐量配一个event loop的比例来设置event loop的数目,即muduo::TcpServer::setThreadNum()的参数。
依据该规则,在编写运行于千兆以太网上的网络程序时,用一个event loop就足以处理网络IO。如果程序计算量少,瓶颈在网络带宽,那么可以按这条规则,只用一个event loop。另一方面,如果程序的IO带宽较小,计算量较大,而且对延迟不敏感,那么可以把计算放到thread pool中,也可以只用一个event loop。 以上假定TCP连接是同质的,没有优先级之分,我们看重的是服务程序的总吞吐量。但如果TCP连接有优先级之分,那么单个event loop不适合,正确的做法是把高优先级的连接用单独的event loop来处理。 在muduo中,属于同一个event loop的连接之间没有事件优先级的差别。这样设计的原因是为了防止优先级反转。
比如一个程序有10个心跳连接,10个数据请求连接,都归属同一个event loop,心跳连接有较高的优先级,心跳连接上的事件应该优先处理。但是由于事件循环,如果数据请求连接上的数据先于心跳连接到达(早到1ms),那么该event loop就会调用相应的event handler去处理数据请求,而在下一次epoll_wait()的时候再来处理心跳事件。
所以在同一个event loop中区分连接的优先级不能达到预想的效果。应该用单独的event loop来管理心跳连接,这样能避免数据连接上的事件阻塞了心跳事件,因为它们分属不同的线程。

结语

§3.3推荐的C++多线程服务端编程模式为:one loop per thread + thread pool。
? event loop用做non-blocking IO和定时器。
? thread pool用做计算,具体可以是任务队列或生产者消费者队列。

\

表6-2。实用方案有5种,muduo直接支持后4种。N表示并发连接数目,C1和C2是与连接数无关、与CPU数目有关的常数。 用银行柜台办理业务为比喻,简述各种模型的特点。
银行有旋转门,办理业务的客户从旋转门进出(IO)。 银行有柜台,客户在柜台办理业务(计算)。 要想办理业务,客户要先通过旋转门进入银行;办理完之后,客户要再次通过旋转门离开银行。 一个客户可以办理多次业务,每次都必须从旋转门进出(TCP长连接)。 旋转门一次只允许一个客户通过(无论进出),因为read()/write()只能同时调用其中一个。 方案5。
银行有一个旋转门、一个柜台,每次只允许一名客户办理业务。 当有人在办理业务时,旋转门是锁住的(计算和IO在同一线程)。 银行要求客户应尽快办理业务,否则会阻塞其他堵在门外的客户。 如果一次办不完,应离开柜台,到门外等着,等银行通知再来继续办理(分阶段回调)。
若客户少,这是经济、高效的方案;但如果场地大(多核),这就浪费了资源,只能并发(concurrent)不能并行(parallel)。 方案8。
银行有一个旋转门,一个或多个柜台。 银行进门后有一个队列,客户在这里排队到柜台(线程池)办理业务。即在单线程Reactor后面接了一个线程池用于计算,可利用多核。 旋转门是不锁的,随时都可以进出。但排队会消耗时间,方案5的客户一进门就能立刻办理业务。
另一做法是线程池里的每个线程有自己的任务队列,而不是整个线程池共用一个任务队列。好处是避免全局队列的锁争用,坏处是计算资源有可能分配不平均,降低并行度。 方案9。
该大银行包含方案5的多家小银行,每个客户进门时被固定分配到某间小银行中,他的业务只能由这间小银行办理,他每次都要进出小银行的旋转门。 大银行可同时服务多个客户。 同样要求办理业务时不能空等(阻塞),否则会影响分到同一间小银行的其他客户。 必要时可为VIP客户单独开一间或几间小银行,优先办理VIP业务。这跟方案5不同,当普通客户在办理业务的时候,VIP客户也只能在门外等着。 方案11。
银行有多个旋转门、多个柜台,旋转门和柜台之间没有对应关系。 客户进门时被固定分配到某一旋转门中(4.6易于实现线程安全的IO)。 进入旋转门后,有一个队列,客户在此排队到柜台办理业务。
该方案的资源利用率可能比方案9更高,一个客户不会被同一小银行的其他客户阻塞,但延迟也比方案9略大。

Welcome to my github: https://github.com/gaoxiangnumber1

点击复制链接 与好友分享!回本站首页
相关TAG标签 muduo网络库
上一篇:Zabbix 生产案例
下一篇:Zabbix 3.0 监控MySQL [六]
相关文章
图文推荐
点击排行

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站