频道栏目
首页 > 程序开发 > 软件开发 > 其他 > 正文
Tomcat 8的Connector部分
2016-09-28 09:05:12         来源:jiangjiajian2008的专栏  
收藏   我要投稿

在tomcat中,connector负责接收来自客户端的连接,并交由后续的代码进行处理。connector对象持有ProtocolHandler对象;ProtocolHandler对象持有AbstractEndpoint对象。AbstractEndpoint负责创建服务器套接字,并绑定到监听端口;同时还创建accepter线程来接收客户端的连接以及poller线程来处理连接中的读写请求。其结构如图1所示。
这里写图片描述
图1 Connector

Connector的入口在其构造函数。

    public Connector() {
        this(null);
    }

    public Connector(String protocol) {
        // 设置协议
        setProtocol(protocol);
        ProtocolHandler p = null;
        try {
            // 反射生成ProtocolHandler实例
            Class clazz = Class.forName(protocolHandlerClassName);
            p = (ProtocolHandler) clazz.newInstance();
        } catch (Exception e) {
            log.error(sm.getString(
                    "coyoteConnector.protocolHandlerInstantiationFailed"), e);
        } finally {
            this.protocolHandler = p;
        }

        if (!Globals.STRICT_SERVLET_COMPLIANCE) {
            URIEncoding = "UTF-8";
            URIEncodingLower = URIEncoding.toLowerCase(Locale.ENGLISH);
        }
    }

   public void setProtocol(String protocol) {

        if (AprLifecycleListener.isAprAvailable()) {
            if ("HTTP/1.1".equals(protocol)) {
                setProtocolHandlerClassName
                    ("org.apache.coyote.http11.Http11AprProtocol");
            } else if ("AJP/1.3".equals(protocol)) {
                setProtocolHandlerClassName
                    ("org.apache.coyote.ajp.AjpAprProtocol");
            } else if (protocol != null) {
                setProtocolHandlerClassName(protocol);
            } else {
                setProtocolHandlerClassName
                    ("org.apache.coyote.http11.Http11AprProtocol");
            }
        } else {
            if ("HTTP/1.1".equals(protocol)) {
                // tomcat8默认配置
                setProtocolHandlerClassName
                    ("org.apache.coyote.http11.Http11NioProtocol");
            } else if ("AJP/1.3".equals(protocol)) {
                setProtocolHandlerClassName
                    ("org.apache.coyote.ajp.AjpNioProtocol");
            } else if (protocol != null) {
                setProtocolHandlerClassName(protocol);
            }
        }

    }

Connector的构造函数带有协议属性,该协议属性是server.xml中Connector标签的protocol的属性值。Tomcat 8中默认值为HTTP/1.1,因此在Connector的构造函数中生成的是Http11NioProtocol对象。在setProtocol()方法中可以看到,tomcat8还包括其他几个协议处理器。协议处理器中带有Apr命名的都是使用Apr库来处理http请求的。通过使用APR库,Tomcat将使用JNI的方式来读取文件以及进行网络传输,可以大大提升Tomcat对静态文件的处理性能,同时如果你使用了HTTPS方式传输的话,也可以提升SSL的处理性能。AJP/1.3协议是Http服务器和应用服务器之间数据交互的协议,比如Apache服务器或IIS服务器与tomcat服务器之间进行数据交互。
Http11NioProtocol是非阻塞模式的Http1.1协议处理器,使用java的nio包来实现非阻塞。可以看到,在tomcat 8中,默认使用的是非阻塞IO。
在创建Http11NioProtocol实例的时候,会创建NioEndpoint、Http11ConnectionHandler实例。

    public Http11NioProtocol() {
        // 创建nioEndPoint
        endpoint=new NioEndpoint();
        // 创建Http11ConnectionHandler
        cHandler = new Http11ConnectionHandler(this);
        ((NioEndpoint) endpoint).setHandler(cHandler);
        // 是指socket被关闭时逗留的时间,值为-1。
        // 在这段时间内,socket会尽量把未送出去的数据给发出去。
        setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
        // 设置读取数据超时
        setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
        // 设置tcp_nodelay,
        setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
    }

NioEndpoint是Connector中处理客户端连接的核心类,负责创建服务器套接字,并绑定到监听端口;同时还创建accepter线程来接收客户端的连接以及poller线程来处理连接中的读写请求。
Connector方法实现Lifecycle接口,当Connector的拥有者调用init()及start()方法的时候,会分别执行Connector的initInternal()以及startInternal()方法。Connector的initInternal()方法会最终调用endpoint的init()方法;startInternal()方法则调用endpoint的start()方法。
endpoint的init()方法在其父类AbstractEndpoint里实现,并最终调用延迟到NioEndpoint中实现的bind()方法。

    public final void init() throws Exception {
        // 校验setUseCipherSuitesOrder方法是否存在
        testServerCipherSuitesOrderSupport();
        if (bindOnInit) {

            bind();
            // 设置状态
            bindState = BindState.BOUND_ON_INIT;
        }
    }

   /**
     * Initialize the endpoint.
     */
    @Override
    public void bind() throws Exception {
        // 打开serverSocketChannel
        serverSock = ServerSocketChannel.open();
        // 设置socket属性
        socketProperties.setProperties(serverSock.socket());
        InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
        // 绑定监听端口
        serverSock.socket().bind(addr,getBacklog());
        // 设为阻塞模式
        serverSock.configureBlocking(true); //mimic APR behavior
        // 设置超时
        serverSock.socket().setSoTimeout(getSocketProperties().getSoTimeout());

        // Initialize thread count defaults for acceptor, poller
        if (acceptorThreadCount == 0) {
            // FIXME: Doesn't seem to work that well with multiple accept threads
            acceptorThreadCount = 1;
        }
        if (pollerThreadCount <= 0) {
            //minimum one poller thread
            pollerThreadCount = 1;
        }
        stopLatch = new CountDownLatch(pollerThreadCount);

        // Initialize SSL if needed
        if (isSSLEnabled()) {
            SSLUtil sslUtil = handler.getSslImplementation().getSSLUtil(this);

            sslContext = sslUtil.createSSLContext();
            sslContext.init(wrap(sslUtil.getKeyManagers()),
                    sslUtil.getTrustManagers(), null);

            SSLSessionContext sessionContext =
                sslContext.getServerSessionContext();
            if (sessionContext != null) {
                sslUtil.configureSessionContext(sessionContext);
            }
            // Determine which cipher suites and protocols to enable
            enabledCiphers = sslUtil.getEnableableCiphers(sslContext);
            enabledProtocols = sslUtil.getEnableableProtocols(sslContext);
        }

        if (oomParachute>0) reclaimParachute(true);
        // 打开阻塞模式的selector
        selectorPool.open();
    }

在bind()方法中,首先打开serverSocketChannel,并绑定到监听端口,此处将其该channel设置为阻塞模式。对于SSL部分,此处略过不讲。在最后的 selectorPool.open()执行语句中,会先获得共享的selector,并且创建线程在该selector上检测事件。这里有一点疑问地方,就是这里创建的selector有什么用?
经过上述过程,endpoint初始化完成,并且开启serverSocketChannel并监听端口。接着看下endpoint的start()方法。和init()方法一样,endpoint的start()方法是在AbstractEndPoint中实现的,并调用推迟到NioEndPoint中的startInternal()方法。

    public final void start() throws Exception {
        if (bindState == BindState.UNBOUND) {
            bind();
            bindState = BindState.BOUND_ON_START;
        }
        startInternal();
    }

    /**
     * Start the NIO endpoint, creating acceptor, poller threads.
     */
    @Override
    public void startInternal() throws Exception {

        if (!running) {
            running = true;
            paused = false;

            // 创建缓存容器
            processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                    socketProperties.getProcessorCache());
            eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                            socketProperties.getEventCache());
            nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                    socketProperties.getBufferPool());

            // 创建线程池
            if ( getExecutor() == null ) {
                createExecutor();
            }

            // 初始化计数器Latch
            initializeConnectionLatch();

            // 创建Poller线程
            pollers = new Poller[getPollerThreadCount()];
            for (int i=0; i<pollers.length; thread="" pollerthread="new" pre="">

在startInternal()方法中,最重要的是创建Poller和Acceptor线程。Acceptor线程处理serverSocketChannel的请求接收事件;Poller处理serverSocketChannel的读写事件。此时可以预想到,Acceptor线程专门负责接收客户端连接socketChannel,然后将socketChannel交给Poller线程读写。在实际中,Poller线程将socketChannel再次封装之后又开启另一个线程进行实际的数据处理。这样设计的目的是避免当某一个请求出现阻塞的时候,影响到整个服务器的接收、处理能力。 按接收请求,处理请求的逻辑,我们先观察Acceptor线程。

    protected class Acceptor extends AbstractEndpoint.Acceptor {

        @Override
        public void run() {
            int errorDelay = 0;
            // 一直循环直到接收停止命令
            while (running) {
                // 暂停命令
                while (paused && running) {
                    state = AcceptorState.PAUSED;
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }

                if (!running) {
                    break;
                }
                state = AcceptorState.RUNNING;

                try {
                    //通过同步计数器来限制连接数目
                    //当连接数目超过上限时,则等待
                    //其中同步计算器是通过继承AQS实现的
                    //默认的最大连接数是10000
                    countUpOrAwaitConnection();

                    SocketChannel socket = null;
                    try {
                        //接收连接,此处并不是使用selector实现
                        //在前面的代码中已知serverSock是阻塞模式的。
                        socket = serverSock.accept();
                    } catch (IOException ioe) {
                        //we didn't get a socket
                        countDownConnection();
                        // Introduce delay if necessary
                        errorDelay = handleExceptionWithDelay(errorDelay);
                        // re-throw
                        throw ioe;
                    }
                    // Successful accept, reset the error delay
                    errorDelay = 0;

                    // setSocketOptions() will add channel to the poller
                    // if successful
                    if (running && !paused) {
                        // 在setSocketOptions中将接收到的socket传给poller线程进行处理
                        if (!setSocketOptions(socket)) {
                            countDownConnection();
                            closeSocket(socket);
                        }
                    } else {
                        countDownConnection();
                        closeSocket(socket);
                    }
                } catch (SocketTimeoutException sx) {
                    // Ignore: Normal condition
                } catch (IOException x) {
                    if (running) {
                        log.error(sm.getString("endpoint.accept.fail"), x);
                    }
                } catch (OutOfMemoryError oom) {
                    try {
                        oomParachuteData = null;
                        releaseCaches();
                        log.error("", oom);
                    }catch ( Throwable oomt ) {
                        try {
                            try {
                                System.err.println(oomParachuteMsg);
                                oomt.printStackTrace();
                            }catch (Throwable letsHopeWeDontGetHere){
                                ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
                            }
                        }catch (Throwable letsHopeWeDontGetHere){
                            ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
                        }
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error(sm.getString("endpoint.accept.fail"), t);
                }
            }
            state = AcceptorState.ENDED;
        }
    }

Acceptor线程会反复执行serverSock.accept()等待客户端连接的到来,等接收到一个客户端连接时,会把接收到的socket传给后续的poller线程处理,其执行过程在setSocketOptions()方法中。

    /**
     * Process the specified connection.
     */
    protected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        try {
            //disable blocking, APR style, we are gonna be polling it
            // 设置为非阻塞模式
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);

            // 从NioChannel容器中获得一个NioChannel
            // NioChannel可以理解为socketChannel的代理类,提供更多的功能
            NioChannel channel = nioChannels.pop();
            if ( channel == null ) {
                // SSL相关
                if (sslContext != null) {
                    SSLEngine engine = createSSLEngine();
                    int appbufsize = engine.getSession().getApplicationBufferSize();

                    NioBufferHandler bufhandler = new NioBufferHandler(Math.max(appbufsize,socketProperties.getAppReadBufSize()),
                                                                       Math.max(appbufsize,socketProperties.getAppWriteBufSize()),
                                                                       socketProperties.getDirectBuffer());
                    channel = new SecureNioChannel(socket, engine, bufhandler, selectorPool);
                } else {
                    // NioBufferHandler维护了在处理过程中的读写缓存
                    NioBufferHandler bufhandler = new NioBufferHandler(socketProperties.getAppReadBufSize(),
                                                                       socketProperties.getAppWriteBufSize(),
                                                                       socketProperties.getDirectBuffer());
                    // 将socket、bufHandler封装到NioChannel中
                    channel = new NioChannel(socket, bufhandler);
                }
            } else {
                channel.setIOChannel(socket);
                if ( channel instanceof SecureNioChannel ) {
                    SSLEngine engine = createSSLEngine();
                    ((SecureNioChannel)channel).reset(engine);
                } else {
                    channel.reset();
                }
            }
            // 将niochannel注册到poller线程中进行处理
            getPoller0().register(channel);
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            try {
                log.error("",t);
            } catch (Throwable tt) {
                ExceptionUtils.handleThrowable(t);
            }
            // Tell to close the socket
            return false;
        }
        return true;
    }

在setSocketOptions()方法中,将socket、niobufferhandler对象封装成niochannel对象,并调用poller线程的register()的方法注册到poller中,等待poller线程进行处理。 查看Poller线程的outline: 这里写图片描述 图2 Poller的outline Poller线程中的主要方法如下: Poller() : 在构造函数中创建打开selector。 destory() : 将close标志位置为true,表示要关闭poller。 addEvent(PollerEvent):内部方法调用,添加PollerEvent事件到队列中,并唤醒阻塞等待的selector。 add(NioChannel)、add(NioChannel, int):创建PollerEvent事件,并调用addEvent()方法将事件添加到队列中。 events():执行队列中的PollerEvent事件,注册读或写 register(NioChannel):外部方法调用,将NioChannel添加到队列中。 cancelledKey(SelectionKey, SocketStatus):取消注册到selector中的socket run():线程执行方法体,负责响应socket读写事件 processKey():内部方法调用,处理接收到的读写事件和队列中的事件

Poller执行从run()开始。

        @Override
        public void run() {
            // 循环直到destroy()方法被调用
            while (true) {
                try {
                    // 暂停处理
                    while (paused && (!close) ) {
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            // Ignore
                        }
                    }

                    boolean hasEvents = false;
                    // 关闭
                    if (close) {
                        // 执行队列中的PollerEvent事件,注册读或写
                        events();
                        // 
                        timeout(0, false);
                        try {
                            selector.close();
                        } catch (IOException ioe) {
                            log.error(sm.getString(
                                    "endpoint.nio.selectorCloseFail"), ioe);
                        }
                        break;
                    } else {
                        // 执行队列中的PollerEvent事件,注册读或写,
                        // hasEvents表示是否有读写事件注册
                        hasEvents = events();
                    }
                    try {
                        if ( !close ) {
                            // wakeupCounter > 0,表示有事件,故直接用selectNow,否则用select(selectorTimeout)以阻塞一段时间等待事件到来
                            if (wakeupCounter.getAndSet(-1) > 0) {
                                //if we are here, means we have other stuff to do
                                //do a non blocking select
                                keyCount = selector.selectNow();
                            } else {
                                keyCount = selector.select(selectorTimeout);
                            }
                            wakeupCounter.set(0);
                        }
                        // 此处在判断一次close,及时响应destroy()方法
                        if (close) {
                            events();
                            timeout(0, false);
                            try {
                                selector.close();
                            } catch (IOException ioe) {
                                log.error(sm.getString(
                                        "endpoint.nio.selectorCloseFail"), ioe);
                            }
                            break;
                        }
                    } catch (Throwable x) {
                        ExceptionUtils.handleThrowable(x);
                        log.error("",x);
                        continue;
                    }
                    //either we timed out or we woke up, process events first
                    if ( keyCount == 0 ) hasEvents = (hasEvents | events());

                    Iterator iterator =
                        keyCount > 0 ? selector.selectedKeys().iterator() : null;
                    // Walk through the collection of ready keys and dispatch
                    // any active event.
                    while (iterator != null && iterator.hasNext()) {
                        SelectionKey sk = iterator.next();
                        KeyAttachment attachment = (KeyAttachment)sk.attachment();
                        // Attachment may be null if another thread has called
                        // cancelledKey()
                        if (attachment == null) {
                            iterator.remove();
                        } else {
                            // 记录时间
                            attachment.access();
                            iterator.remove();
                            // 将sk和attachtment包装,交由后续线程继续处理
                            processKey(sk, attachment);
                        }
                    }//while

                    //process timeouts
                    timeout(keyCount,hasEvents);
                    if ( oomParachute > 0 && oomParachuteData == null ) checkParachute();
                } catch (OutOfMemoryError oom) {
                    try {
                        oomParachuteData = null;
                        releaseCaches();
                        log.error("", oom);
                    }catch ( Throwable oomt ) {
                        try {
                            System.err.println(oomParachuteMsg);
                            oomt.printStackTrace();
                        }catch (Throwable letsHopeWeDontGetHere){
                            ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
                        }
                    }
                }
            }//while

            stopLatch.countDown();
        }

在上述流程中,核心流程是循环注册队列中的事件,监听selector,并将监听到的SelectionKey事件包装,交给线程池处理;还有其他一些附属流程,比如当Poller关闭时的处理以及超时处理。其处理流程如图3所示。 这里写图片描述 图3 Poller处理流程

1、events()方法处理事件注册

        public boolean events() {
            boolean result = false;

            PollerEvent pe = null;
            // 从队列中获得PollerEvent事件
            while ( (pe = events.poll()) != null ) {
                result = true;
                try {
                    // 调用PollerEvent的run()方法执行事件注册
                    pe.run();
                    pe.reset();
                    if (running && !paused) {
                        eventCache.push(pe);
                    }
                } catch ( Throwable x ) {
                    log.error("",x);
                }
            }

            return result;
        }

        public void run() {
            // 如果是注册,则把socket注册到selector中
            if ( interestOps == OP_REGISTER ) {
                try {
                    socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
                } catch (Exception x) {
                    log.error("", x);
                }
            } else {
                final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
                try {
                    boolean cancel = false;
                    if (key != null) {
                        final KeyAttachment att = (KeyAttachment) key.attachment();
                        if ( att!=null ) {
                            att.access();//to prevent timeout
                            // interestOps相或,追加事件
                            int ops = key.interestOps() | interestOps;
                            att.interestOps(ops);
                            key.interestOps(ops);
                        } else {
                            cancel = true;
                        }
                    } else {
                        cancel = true;
                    }
                    if ( cancel ) socket.getPoller().cancelledKey(key,SocketStatus.ERROR);
                }catch (CancelledKeyException ckx) {
                    try {
                        socket.getPoller().cancelledKey(key,SocketStatus.DISCONNECT);
                    }catch (Exception ignore) {}
                }
            }//end if
        }//run

在events()方法中,通过调用PollerEvent的run()方法将socket注册到selector中。

2、processKey() processKey()方法处理准备完毕的事件。

        protected boolean processKey(SelectionKey sk, KeyAttachment attachment) {
            boolean result = true;
            try {
                // 如果close,则取消sk
                if ( close ) {
                    cancelledKey(sk, SocketStatus.STOP);
                } else if ( sk.isValid() && attachment != null ) {
                    // 对于有效的socket,保证不会超时
                    attachment.access();//make sure we don't time out valid sockets
                    if (sk.isReadable() || sk.isWritable() ) {
                        if ( attachment.getSendfileData() != null ) {
                            // 处理文件
                            processSendfile(sk,attachment, false);
                        } else {
                            if ( isWorkerAvailable() ) {
                                // 
                                unreg(sk, attachment, sk.readyOps());
                                boolean closeSocket = false;
                                // Read goes before write
                                if (sk.isReadable()) {
                                    // 处理可读
                                    if (!processSocket(attachment, SocketStatus.OPEN_READ, true)) {
                                        closeSocket = true;
                                    }
                                }
                                // 处理可写
                                if (!closeSocket && sk.isWritable()) {
                                    if (!processSocket(attachment, SocketStatus.OPEN_WRITE, true)) {
                                        closeSocket = true;
                                    }
                                }
                                if (closeSocket) {
                                    cancelledKey(sk,SocketStatus.DISCONNECT);
                                }
                            } else {
                                result = false;
                            }
                        }
                    }
                } else {
                    //invalid key
                    cancelledKey(sk, SocketStatus.ERROR);
                }
            } catch ( CancelledKeyException ckx ) {
                cancelledKey(sk, SocketStatus.ERROR);
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error("",t);
            }
            return result;
        }

在processKey()方法中,主要是处理可读、可写事件。查看processSocket(attachment, SocketStatus.OPEN_READ, true)。

    protected boolean processSocket(KeyAttachment attachment, SocketStatus status, boolean dispatch) {
        try {
            if (attachment == null) {
                return false;
            }
            // 将selectionKey包装为SocketProcessor 
            SocketProcessor sc = processorCache.pop();
            if ( sc == null ) sc = new SocketProcessor(attachment, status);
            else sc.reset(attachment, status);
            Executor executor = getExecutor();
            if (dispatch && executor != null) {
                // 交给线程池处理或直接运行
                executor.execute(sc);
            } else {
                sc.run();
            }
        } catch (RejectedExecutionException ree) {
            log.warn(sm.getString("endpoint.executor.fail", attachment.getSocket()), ree);
            return false;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            log.error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }

在processSocket()方法中,将读写的selectionKey包装为SocketProcessor,并交给线程池处理。 至此,Connector中处理请求的流程已经结束。至于如何处理请求中的数据,将在后续代码中继续执行。

总结: 在Tomcat 8中,Connector负责处理客户端的连接请求,其核心实现在EndPoint类中。EndPoint中定义了Acceptor来接收客户端的socket,并将socket交给Poller线程进行读写处理。Poller线程监听到读写事件后,继续将selectionKey封装成Processor交给后续代码处理。

点击复制链接 与好友分享!回本站首页
相关TAG标签 Tomcat Connector部分
上一篇:一天一个设计模式---模板方法模式
下一篇:Hazelcast集群原理分析
相关文章
图文推荐
文章
推荐
点击排行

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站