频道栏目
首页 > 资讯 > 云计算 > 正文

Ceph网络通信机制与源码分析

17-12-02        来源:[db:作者]  
收藏   我要投稿

Ceph网络通信机制与源码分析

作为一个分布式存储系统,Ceph自然需要一个稳定的网络通信模块,用于客户端和服务端,以及各个节点之间的消息通信。Ceph的网络模块位于源代码的ceph/src/msg 下,该模块构造了网络通信的基本框架。在文件夹下还包含了三种接口的实现:simple、async、xio。由于simple比较简单,也是目前生产环境中可以使用的,所以就只介绍它。

simple对于每一个连接,都会创建两个线程,其中一个用于监听和读取该终端的读事件,另一个用于写事件。读线程得到请求以后会解析网络流并开始构建消息,然后派发到后面的 Dispatcher。写线程在大部分时候会处于 Sleep 状态,直到有新的消息需要发送才会被唤醒。

Messenger是网络模块的核心数据结构,负责接收/发送消息。OSD主要有两个Messenger:ms_public处于与客户端的消息,ms_cluster处理与其它OSD的消息

这里写图片描述

基础类介绍

Message

该类是消息的基类,所有要发送的消息都是继承自该类的,它由消息头(header)、数据(user_data)、结束标记(footer)构成。

打开 ceph/src/msg/Message.h

class Message : public RefCountedObject {
protected:
  ceph_msg_header  header;      // headerelope
  ceph_msg_footer  footer;
  bufferlist       payload;  // "front" unaligned blob
  bufferlist       middle;   // "middle" unaligned blob
  bufferlist       data;     // data payload (page-alignment will be preserved where possible)

  ...
  ...
  ...

类中封装了ceph_msg_header 、ceph_msg_footer 。他们就是信息头和信息结束标志的结构体。数据则是由三部分组成的,分别是playload(一般保存相关元数据)、middle(留用)、data(读写数据)。

ceph_msg_header 主要是封装数据相关信息

class SimpleMessenger : public SimplePolicyMessenger {
   //是Messager接口的实现


public:
  Accepter accepter;//用来监听请求
  DispatchQueue dispatch_queue;//请求的队列

  friend class Accepter;

  //用于创建一个Pipe
  Pipe *connect_rank(const entity_addr_t& addr, int type, PipeConnection *con,
             Message *first);

  /**
   * Queue up a Message for delivery to the entity specified
   * by addr and dest_type.
   * submit_message() is responsible for creating
   * new Pipes (and closing old ones) as necessary.
   * 提交发送消息到发送队列,在必要时创建Pipe
   */
  void submit_message(Message *m, PipeConnection *con,
              const entity_addr_t& addr, int dest_type,
              bool already_locked);

  friend class Pipe;

 //在已存在的Pipe中查找
  Pipe *_lookup_pipe(const entity_addr_t& k) {
    ceph::unordered_map::iterator p = rank_pipe.find(k);
    if (p == rank_pipe.end())
      return NULL;
    // see lock cribbing in Pipe::fault()
    if (p->second->state_closed)
      return NULL;
    return p->second;
  }

} ;

Connection

看类的名字就知道他是干嘛的了,是用来发送消息、接收消息的的。

struct Connection : public RefCountedObject {
  mutable Mutex lock;
  Messenger *msgr;
  RefCountedObject *priv;
  int peer_type;
  entity_addr_t peer_addr;
  utime_t last_keepalive, last_keepalive_ack;
  ....
  ....
  ....
  /**
   * 判断是否能够发送消息了。。。。
   * @return true if ready to send, or false otherwise
   */
  virtual bool is_connected() = 0;
  ...
  ...
  /**
   * @param m The Message to send. The Messenger consumes a single reference
   * when you pass it in.
   * 主要功能就是这个 !!!用来发送消息的!!!!
   * @return 0 on success, or -errno on failure.
   */
  virtual int send_message(Message *m) = 0;

  int send_message(boost::intrusive_ptr m)
  {
    return send_message(m.detach()); /* send_message(Message *m) consumes a reference */
  }
  ...
  ...
  ...

};

Pipe

Pipe实现的就是开头提到的,对于每一个连接,都会在内部创建一个读线程、一个写线程用来处理接收消息和发送消息。它的层次位于Connetion和Dispatcher的中间,其中拥有读写线程pipe:: reader_thread和pipe::writer_thread,他们的入口函数分别为Pipe::reader和Pipe::writer函数。可以看到,其实具体的封装socket是在这个部分的吧。。。

class Pipe : public RefCountedObject {
    /**
     * The Reader thread handles all reads off the socket 
     * 读线程,用于接收消息
     */ 
    class Reader : public Thread {
      Pipe *pipe;
    public:
      explicit Reader(Pipe *p) : pipe(p) {}
      void *entry() override { pipe->reader(); return 0; }
    } reader_thread;

    /**
     * The Writer thread handles all writes to the socket (after startup).
     * 写线程用于发送消息
     */
    class Writer : public Thread {
      Pipe *pipe;
    public:
      explicit Writer(Pipe *p) : pipe(p) {}
      void *entry() override { pipe->writer(); return 0; }
    } writer_thread;
    ....
    ....
    ....
    map > out_q;  // priority queue 准备发送的消息队列
    DispatchQueue *in_q;//接收到消息的队列
    list sent;//当前要发送的消息
    Cond cond;
    bool send_keepalive;
    bool send_keepalive_ack;
    utime_t keepalive_ack_stamp;
    bool halt_delivery; //if a pipe's queue is destroyed, stop adding to it
    ....
    __u32 connect_seq, peer_global_seq;
    uint64_t out_seq;//发送序号
    uint64_t in_seq, in_seq_acked;//接收序号、ACK信号
    ....
    void set_socket_options();
    ....
    int read_message(Message **pm,
             AuthSessionHandler *session_security_copy);
    int write_message(const ceph_msg_header& h, const ceph_msg_footer& f, bufferlist& body);
    void start_reader();
    void start_writer();
    void shutdown_socket() {
      recv_reset();
      if (sd >= 0)
        ::shutdown(sd, SHUT_RDWR);
    }
    ...
    ...
    ...

  };

Dispatcher

这个类用于消息的分发,在Pipe中接收、发送队列中有很多请求,他就是负责把Message的请求分发给具体的应用层。

lass Dispatcher {
public:
  explicit Dispatcher(CephContext *cct_)
    : cct(cct_)
  {
  }
  virtual ~Dispatcher() { }

  /**
   * The Messenger calls this function to query if you are capable
   * of "fast dispatch"ing a message. Indicating that you can fast
   * dispatch it requires that you:
   * 1) Handle the Message quickly and without taking long-term contended
   * locks. (This function is likely to be called in-line with message
   * receipt.)
   * 2) Be able to accept the Message even if you have not yet received
   * an ms_handle_accept() notification for the Connection it is associated
   * with, and even if you *have* called mark_down() or received an
   * ms_handle_reset() (or similar) call on the Connection. You will
   * not receive more than one dead "message" (and should generally be
   * prepared for that circumstance anyway, since the normal dispatch can begin,
   * then trigger Connection failure before it's percolated through your system).
   * We provide ms_handle_fast_[connect|accept] calls if you need them, under
   * similar speed and state constraints as fast_dispatch itself.
   * 3) Be able to make a determination on fast_dispatch without relying
   * on particular system state -- the ms_can_fast_dispatch() call might
   * be called multiple times on a single message; the state might change between
   * calling ms_can_fast_dispatch and ms_fast_dispatch; etc.
   *
   * @param m The message we want to fast dispatch.
   * @returns True if the message can be fast dispatched; false otherwise.
   */

  /**
   * Perform a "fast dispatch" on a given message. See
   * ms_can_fast_dispatch() for the requirements.
   *
   * @param m The Message to fast dispatch.
   */
  virtual void ms_fast_dispatch(Message *m) { ceph_abort(); }

  /**
   * The Messenger calls this function to deliver a single message.
   *
   * @param m The message being delivered. You (the Dispatcher)
   * are given a single reference count on it.
   */
  virtual bool ms_dispatch(Message *m) = 0;

  /**
   * This function will be called whenever a Connection is newly-created
   * or reconnects in the Messenger.
   *
   * @param con The new Connection which has been established. You are not
   * granted a reference to it -- take one if you need one!
   */
  virtual void ms_handle_connect(Connection *con) {}

  /**
   * This function will be called synchronously whenever a Connection is
   * newly-created or reconnects in the Messenger, if you support fast
   * dispatch. It is guaranteed to be called before any messages are
   * dispatched.
   *
   * @param con The new Connection which has been established. You are not
   * granted a reference to it -- take one if you need one!
   */
  virtual void ms_handle_fast_connect(Connection *con) {}

  /**
   * Callback indicating we have accepted an incoming connection.
   *
   * @param con The (new or existing) Connection associated with the session
   */
  virtual void ms_handle_accept(Connection *con) {}
  ...
  ...
  ...

};

流程分析

消息的发送:

这里写图片描述

图片来自 http://www.linuxidc.com/Linux/2015-10/124549.htm

SimpleMessager 首先获取对应的Connection

首先查找已有的Pipe,若没有则创建一个Pipe

获得connection后,调用发送函数 _send_message

_send_message 最终调用submit_message函数,它会查看Pipe的状态,若空则创建一个Pipe,若不空且状态不是关闭状态,那就把调用_send 把消息发送到out_q发送队列中,触发发送线程。

触发发送线程后 ,每个的Pipe负责使用Write_thread 来发送out_q的消息,入口函数为Pipi::write。

进入发送线程后,使用_get_next_outgoing来获取out_q中的一个消息,使用write_message来把消息发送出去

消息的接收

这里写图片描述

使用入口函数 Pipe::reader 启动接收线程

1) 判断状态后,使用tcp_read 获取tag

2)根据tag的类型获取不同类型的消息。

3) 调用 read_message 来接收消息,函数返回后,消息接收完成。

判断能不能使用fast_dispatch ,能则不将消息加入mqueue (DispatchQueue中)直接由Pipe调用ms_fast_dispatch函数处理,否则加入该队列,让DispatchQueue调用ms_dispatch 处理。

相关TAG标签
上一篇:使用memcached,xcache做PHP缓存优化
下一篇:正态分布(高斯分布)公式
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站