menu 冷 の Codeworld
search self_improvement
目录

Netty网络框架

冷环渊
冷环渊 2022年04月03日  ·  阅读 362

Netty网络框架

(一) 基础篇

1、I/O基础

输入流:InputStream和Reader
输出流:OutputStream和Writer

​ 字节流 字符流

计算机最小的二进制单位 bit 比特 代表0和1
字节 1 byte = 8bit 计算机处理的最小单位
字符 1 char = 2byte = 16bit 人处理的最小单位

所以,字节流处理文件、图片、视频等二进制数据,而字符流处理文本数据。

2、Socket

原意是“插座”,在计算机领域中,翻译为“套接字”。
本质上,是计算机之间进行通信的一种方式。

Linux,“一切皆文件”,给每个文件映射一个ID,叫做"文件描述符"。
当处理网络连接时,也会看成一个文件,read/write变成和远程计算机的交互。

OSI七层模型 = Open System Interconnection 开放式系统互联
从下到上分别为:物理层、数据链路层、网络层、传输层、会话层、表示层和应用层。

实际应用的是优化后的TCP/IP模型(四层)
网络接口层/链路层、网络层、传输层、应用层

应用层协议:HTTP、FTP、SMTP(邮件协议)
传输层协议:TCP、UDP

Socket其实是应用层与传输层之间的抽象层,是一组接口。
在设计模式中,是门面模式。

3、NIO

BIO - BlockingIO 同步阻塞
NIO - New IO / Non-Blocking IO 同步非阻塞
AIO - Asynchronous IO 异步非阻塞

同步和异步,关注的是消息通知的机制
阻塞和非阻塞,关注的是等待消息过程中的状态

image-20220412192629185

多路复用的模型

image-20220412192637464

三大元素:Channel 、Buffer、Selector

1) Channel

FileChannel 文件管道的数据
Pipe.SinkChannel
Pipe.SourceChannel 线程间通信的管道
ServerSocketChannel
SocketChannel 用于TCP网络通信的管道
DatagramChannel 用于UDP网络通信的管道

2) Buffer

image-20220412192652620

capacity 总体容量大小
limit 存储容量的大小,是可读写和不可读写的界线
position 已读容量的大小,已读和未读区域的界线

【使用原理】
a) 初始化,给定总容量,position=0, limit=capacity
b) 当使用put方法存入数据是,通过position来记录存储的容量变化,position不断后移,直到存储结束(写完成)
c)写完成需要调用flip方法刷新,limit=position,position=0
保障limit记录的是可读写区域的大小,position已读部分重置为空
d) 读数据直到读完成,需要调用clear方法,position=0, limit=capacity

3) Selector

三个元素: Selector选择器、SelectableChannel可选择的通道、SelectionKey选择键

本质上,Selector是监听器,监听的是通道是否有我们关心的操作产生,操作对应的是事件(连接、接收、读/写),使用SelectionKey代表具体的事件,在确保通道是可选择的情况下,将通道注册进选择器中,此时Selector维护的是,通道和事件之间的关联关系。

image-20220412192659118

Selector,管理被注册的通道集合,以及他们的状态
SelectableChannel,是一个抽象类,提供了通道可被选择需要实现的api。
FileChannel就不是可选择的,Socket相关的通道都是可选择的
一个通道可以被注册到多个选择器上吗? 可以的
多个通道可以注册到一个选择器上,但一个通道只能在一个选择器中注册一次

SelectionKey,封装了要监听的事件,连接、接收、读、写。
一方面,Selector关心通道要处理哪些事件
另一方面,当事件触发时,通道要处理哪些事件

【使用方式】

a、首先通过open方法,获取通道,将通道设置为非阻塞的
b、通过open方法,获取选择器,将通道注册进选择器中,伴随设置通道要处理的事件(OP_ACCEPT)
c、轮询选择器,当前是否有要处理的操作 select() > 0?
如果有,要获取,待处理操作的集合Set<SelectionKey> , 进行遍历
遍历到SelectionKey时,判断对应哪种操作,不同的操作设置不同的处理方式
如OP_ACCEPT,接收客户端通道并进行注册,监听后续处理的事件,如OP_WRITE
如OP_WRITE,通过key的方法获取通道本身,读取数据并继续监听事件,如OP_READ

4、零拷贝

需求:将磁盘中的文件读取出来,通过socket发送出去

传统的拷贝方式(4次)
Socket网络缓冲区,也属于操作系统的内核缓冲区。

image-20220412192706450

在操作系统中进行的拷贝(如第二次和第三次),叫做CPU拷贝。
连接磁盘或网卡等硬件的拷贝(如第一次和第四次),叫做DMA拷贝。

零拷贝的概念:减少CPU拷贝的次数。

零拷贝是基于操作系统层面的优化方式(以下基于Linux系统)

1) mmap = memory mapping 内存映射

image-20220412192746768

2)sendfile (linux2.1内核支持)

image-20220412192758187

  1. sendfile with scatter/gather copy(批量sendfile)
    从单个文件的处理,上升到多个物理地址的处理,提高处理速度

4)splice (拼接,在linux2.6内核支持)

​ 在操作系统内核缓冲区和Socket网络缓冲区之间建立管道,来减少拷贝次数。

image-20220412192804981

线程模型

在这里插入图片描述

1) 单线程Reactor模型

顾名思义 就是使用一个线程来处理问题 线程中

  • selector
  • 事件处理 : 连接事件
  • 处理事件:handler

在这里插入图片描述

单线程服务器

public class ReactorServer {
    private Selector selector;
    private ServerSocketChannel serverSocketChannel;

    public ReactorServer() {
        try {
            // 初始化监听器 与 channel 通道
            selector = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();
            // 配置为非阻塞的
            serverSocketChannel.configureBlocking(false);

            // 配置通道连接地址 开放 9090 端口
            SocketAddress address = new InetSocketAddress(9090);
            serverSocketChannel.socket().bind(address);

            //将channel 注册到 selector监听通道事件  达到多路复用
            //首个注册事件一般都是 accept 连接事件
            SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            //    创建处理连接事件的 acceptor
            // 同时 创建处理器 接受后序的IO读写事件,不断的遍历 是否有事情发生
            Acceptor acceptor = new Acceptor(selector, serverSocketChannel);
            //附加一个对象 用来处理事件
            key.attach(acceptor);
            while (true) {
                //返回事件的个数 处理事件
                int num = selector.select();
                if (num == 0) {
                    continue;
                }
                //没有跳过就代表有事件需要处理,拿到事件集合
                Set<SelectionKey> SKeyset = selector.selectedKeys();
                Iterator<SelectionKey> iterator = SKeyset.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    //拿到事件的第一事情 移出事件 避免重复处理
                    iterator.remove();
                    //根据事件类型 分发 给监听器处理
                    //需要处理事情的时候 取出存储的对象
                    //如有接收的时Accpet 事件 获取的就是Acceptor 事件
                    //如果接受的时读写事件 获取的就是 Handler 事件
                    Runnable runnable = (Runnable) key.attachment();
                    runnable.run();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {

    }
}

Accpetor 连接事件

public class Acceptor implements Runnable {
    private Selector selector;
    private ServerSocketChannel serverSocketChannel;

    public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
        this.selector = selector;
        this.serverSocketChannel = serverSocketChannel;
    }

    @Override
    public void run() {
        try {
            //接受客户端传入的连接时 Socket Channel
            SocketChannel socketChannel = serverSocketChannel.accept();
            //设置异步
            socketChannel.configureBlocking(false);
            SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
            // 创造处理器 处理连接
            //单线程
            //Handler handler = new Handler(key);
            //多线程
            MultHandler handler = new MultHandler(key);
            handler.run();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

Handler 单线程处理

public class Handler implements Runnable {
    private SelectionKey key;
    private State state;

    public Handler(SelectionKey key) {
        this.key = key;
        this.state = State.READ;
    }

    @Override

    public void run() {
        //处理 读写操作,判断读写
        switch (state) {
            case READ:
                read();
                break;
            case WRITE:
                write();
                break;
            default:
                break;
        }
    }


    /*轮流处理末尾添加事件达到循环处理*/
    //处理 读方法
    private void read() {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        //    通过通道获取KEY
        SocketChannel channel = (SocketChannel) key.channel();
        try {
            //将传入的数据写入到buffer中
            int num = channel.read(buffer);
            //    转化成String
            String msg = new String(buffer.array());
            //    增加业务处理
            //    继续处理注册写事件
            key.interestOps(SelectionKey.OP_WRITE);
            this.state = State.WRITE;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //处理 写方法
    private void write() {
        ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes());
        try {
            //    通过通道获取KEY
            SocketChannel channel = (SocketChannel) key.channel();
            channel.write(buffer);
            //    继续处理注册写事件
            key.interestOps(SelectionKey.OP_READ);
            this.state = State.READ;
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    //记录状态 非读即写
    private enum State {
        //读写事件
        READ, WRITE
    }
}
2)多线程Reactor模型

提高handler的处理效率,首先handler不再负责具体的业务逻辑,当读取出数据后,分发给子线程处理,子线程处理完成后再将结果返回给handler,handler再将结果返回给客户端。

在这里插入图片描述

多线程处理 (handler使用线程池)

public class MultHandler implements Runnable {
    private SelectionKey key;
    private State state;

    private ExecutorService pool;

    public MultHandler(SelectionKey key) {
        this.key = key;
        this.state = State.READ;
    }

    @Override

    public void run() {
        //处理 读写操作,判断读写
        switch (state) {
            case READ:
                //将最耗时的操作 放入线程池执行
                pool.execute(new Runnable() {
                    @Override
                    public void run() {
                        read();
                    }
                });
                break;
            case WRITE:
                write();
                break;
            default:
                break;
        }
    }


    /*轮流处理末尾添加事件达到循环处理*/
    //处理 读方法
    private void read() {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        //    通过通道获取KEY
        SocketChannel channel = (SocketChannel) key.channel();
        try {
            //将传入的数据写入到buffer中
            int num = channel.read(buffer);
            //    转化成String
            String msg = new String(buffer.array());
            //    增加业务处理
            //    继续处理注册写事件
            key.interestOps(SelectionKey.OP_WRITE);
            this.state = State.WRITE;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //处理 写方法
    private void write() {
        ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes());
        try {
            //    通过通道获取KEY
            SocketChannel channel = (SocketChannel) key.channel();
            channel.write(buffer);
            //    继续处理注册写事件
            key.interestOps(SelectionKey.OP_READ);
            this.state = State.READ;
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    //记录状态 非读即写
    private enum State {
        //读写事件
        READ, WRITE
    }
}
3)主从Reactor模型

mainReactor用来接收连接事件,然后分发给acceptor,acceptor在处理过程中,直接将后续的读写事件,注册到slaveReactor之中,以此来达到分流。

在这里插入图片描述

主从监听器

//主从模型
public class MultReactorServer {
    private Selector mainselector;
    private Selector slaveselector;
    private ServerSocketChannel serverSocketChannel;

    public MultReactorServer() {
        try {
            // 主 reactor 处理连接事件
            mainselector = Selector.open();
            //从reactor 处理读写事件
            slaveselector = Selector.open();
            // 配置为非阻塞的
            serverSocketChannel.configureBlocking(false);

            // 配置通道连接地址 开放 9090 端口
            SocketAddress address = new InetSocketAddress(9090);
            serverSocketChannel.socket().bind(address);

            //将channel 注册到 selector监听通道事件  达到多路复用
            //首个注册事件一般都是 accept 连接事件 (参数变化)
            SelectionKey key = serverSocketChannel.register(mainselector, SelectionKey.OP_ACCEPT);
            //    创建处理连接事件的 acceptor
            // 同时 创建处理器 接受后序的IO读写事件,不断的遍历 是否有事情发生 (参数变化)
            Acceptor acceptor = new Acceptor(slaveselector, serverSocketChannel);
            //附加一个对象 用来处理事件
            key.attach(acceptor);
            //主从监听逻辑分离
            new HandlerLoop(slaveselector).run();
            while (true) {
                //返回事件的个数 处理事件
                int num = mainselector.select();
                if (num == 0) {
                    continue;
                }
                //没有跳过就代表有事件需要处理,拿到事件集合
                Set<SelectionKey> SKeyset = mainselector.selectedKeys();
                Iterator<SelectionKey> iterator = SKeyset.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    //拿到事件的第一事情 移出事件 避免重复处理
                    iterator.remove();
                    //根据事件类型 分发 给监听器处理
                    //需要处理事情的时候 取出存储的对象
                    //只处理主Reactor 只处理连接事件
                    Runnable runnable = (Runnable) key.attachment();
                    runnable.run();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

主从事件处理读写分离

//用于处理从reactor事件监听
public class HandlerLoop implements Runnable {

    private Selector selector;

    public HandlerLoop(Selector selector) {
        this.selector = selector;
    }


    @Override
    public void run() {
        try {
            while (true) {
                //返回事件的个数 处理事件
                int num = selector.select();
                if (num == 0) {
                    continue;
                }
                //没有跳过就代表有事件需要处理,拿到事件集合
                Set<SelectionKey> SKeyset = selector.selectedKeys();
                Iterator<SelectionKey> iterator = SKeyset.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    //拿到事件的第一事情 移出事件 避免重复处理
                    iterator.remove();
                    //根据事件类型 分发 给监听器处理
                    //需要处理事情的时候 取出存储的对象
                    //只处理从reactor 所以 接受的一定是读写事件
                    Runnable runnable = (Runnable) selectionKey.attachment();
                    runnable.run();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

(二) 应用篇

1、HTTP

1) 0.9版本

GET /index.html

服务端只能返回html格式,传输过程只能处理文字。

2) 1.0版本

支持任何格式的内容,包括图像、视频、二进制等等

引入了POST命令、HEAD命令

增加了请求头、状态码,以及权限、缓存等

GET / HTTP/1.0
User-Agent:Mozilla/1.0
Accept: */*
HTTP/1.0 200 OK
Content-Type: text/plain
Content-Encoding: gzip

<html>
  <body> hello world </body>
</html>

a、 Content-Type

服务端通知客户端,当前数据的格式

示例: text/html 、 image/png 、 application/pdf 、 video/mp4

前面是一级类型,后面是二级类型,用斜杠分隔; 还可以增加其他参数,如编码格式。

Content-Type: text/plain; charset=utf-8

b、Content-Encoding

表示数据压缩的方式,gzip、compress、deflate

对应客户端的字段为 Accept-Encoding,代表接收哪些压缩方式

c、缺点和问题

每个TCP连接只能发送一个请求,发送完毕连接关闭,使用成本很高,性能较差。

Connection: keep-alive   - 非标准字段

3) 1.1版本

GET / HTTP/1.1
User-Agent: PostmanRuntime/7.24.1
Accept: */*
Cache-Control: no-cache
Postman-Token: 636ce8a6-7eab-451a-8638-4534a3578095
Host: cn.bing.com
Accept-Encoding: gzip, deflate, br
Connection: keep-alive

HTTP/1.1 200 OK
Cache-Control: private, max-age=0
Content-Length: 45786
Content-Type: text/html; charset=utf-8
Content-Encoding: br
Vary: Accept-Encoding
P3P: CP="NON UNI COM NAV STA LOC CURa DEVa PSAa PSDa OUR IND"
Set-Cookie: SRCHD=AF=NOFORM; domain=.bing.com; expires=Wed, 22-Jun-2022 07:03:23 GMT; path=/
Set-Cookie: SRCHUID=V=2&GUID=5C28FF778A2C4A00B32F5408147038BF&dmnchg=1; domain=.bing.com; expires=Wed, 22-Jun-2022 07:03:23 GMT; path=/
Set-Cookie: SRCHUSR=DOB=20200622; domain=.bing.com; expires=Wed, 22-Jun-2022 07:03:23 GMT; path=/
Set-Cookie: _SS=SID=23AE726877396796143D7C99761766C7; domain=.bing.com; path=/
Set-Cookie: _EDGE_S=F=1&SID=23AE726877396796143D7C99761766C7; path=/; httponly; domain=bing.com
Set-Cookie: _EDGE_V=1; path=/; httponly; expires=Sat, 17-Jul-2021 07:03:23 GMT; domain=bing.com
Set-Cookie: MUID=1CB3B15BCBD2637E2C01BFAACAFC6214; samesite=none; path=/; secure; expires=Sat, 17-Jul-2021 07:03:23 GMT; domain=bing.com
Set-Cookie: MUIDB=1CB3B15BCBD2637E2C01BFAACAFC6214; path=/; httponly; expires=Sat, 17-Jul-2021 07:03:23 GMT
X-MSEdge-Ref: Ref A: 71D6F4B3BA9C448EB453341AC182C7BC Ref B: BJ1EDGE0311 Ref C: 2020-06-22T07:03:23Z
Date: Mon, 22 Jun 2020 07:03:23 GMT

a、持久连接,含义为默认不关闭tcp连接,可以被多个请求复用。大多时候,浏览器对同一个域名,允许同时建立6个连接。

b、管道机制,支持客户端发送多个请求,管理请求的顺序的。服务器还是按照接受请求的顺序,返回对应的响应结果。

c、Content-Length, 用来区分数据包的重要字段

d、支持PUT、DELETE、PATCH等命令

缺点和问题

当部分请求耗时较长时,仍会阻塞后续请求的处理速度,这种现象叫做“队头阻塞”/"线头阻塞"。

4) 2.0版本

解决队头阻塞的问题,使用的是多路复用的方式。

说了这么多 上代码来操作一下吧!!

我们的编写思路是这样的

  • 编写初始化服务端
  • 编写自定义初始化器 和 自定义处理器
  • 启动postman 查看我们设置的 http 的响应结果

我们这里有三个类

  • HttpServer 初始化服务端
  • MyHttpHandler 自定义处理器
  • MyHttpInitializer 自定义初始化

首先是 server

我们需要在初始化服务端的时候 设置主从线程模型(Netty中常用)
设置 启动参数 和阻塞队列的长度等设置
设置 初始化

public class HttpServer {
    public static void main(String[] args) {
        //可以自定义线程的数量
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 默认创建的线程数量 = CPU 处理器数量 *2
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler())
                //当前连接被阻塞的时候,BACKLOG代表的事 阻塞队列的长度
                .option(ChannelOption.SO_BACKLOG, 128)
                //设置连接保持为活动状态
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(new MyHttpInitializer());

        try {
        //设置为异步启动 异步 关闭
            ChannelFuture future = serverBootstrap.bind(9988).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
       		//netty的优雅关闭 指 等一切执行完毕之后 慢慢的关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

编写 初始化

继承ChannelInitializer泛型为Channel,用来进行设置出站解码器和入站编码器
使用 codec netty封装好的解码器,这样我们就不用每次定义 解码和编码

public class MyHttpInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        //先对应请求解码,后对响应解码
        //pipeline.addLast("decoder", new HttpRequestDecoder());
        //pipeline.addLast("encoder", new HttpRequestEncoder());

        //当然每次我们都要解码编码很麻烦,netty也有为我们提供对应的解决方案,建议直接使用这个 不会出错
        pipeline.addLast("codec", new HttpServerCodec());
        //压缩数据
        pipeline.addLast("compressor", new HttpContentCompressor());
        //聚合完整的信息 参数代表可以处理的最大值 此时的是 512 kb
        pipeline.addLast("aggregator", new HttpObjectAggregator(512 * 1024));
        pipeline.addLast(new MyHttpHandler());
    }
}

有了初始化,我们还需要一个做事的 那就是 处理器 Handler

netty帮我们封装了返回完整http响应的类 DefaultFullHttpResponse
我们只需要在读取的时候 设置协议,状态码和响应信息,
配置响应头的类型和长度 就可以完成对请求的响应

/*
 * 泛型需要设置为 FullHttpRequest
 * 筛选 message 为此类型的消息才处理
 * */
public class MyHttpHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest) throws Exception {
        //DefaultFullHttpResponse 是一个默认的完整的http响应
        DefaultFullHttpResponse response = new DefaultFullHttpResponse(
                HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
                Unpooled.wrappedBuffer("hello http netty demo".getBytes())
        );
        //    我们还需要设置响应头
        // 设置请求 响应头字段 可以使用 HttpHeaderNmaes
        // 设置字段时 可以使用
        HttpHeaders headers = response.headers();
        headers.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN + ";charset=UTF-8");
        headers.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
        ctx.write(response);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
}

启动结果
在这里插入图片描述
postman 差看到的请求响应 参数
可以看到我们设置的 http1.1 协议
类型 text/plain 长度 47
响应给客户端的 内容 hello http netty demo

在这里插入图片描述

小结

  1. 了解 http 各个版本的解决了什么问题,优缺点,优劣性
  2. 手动编写一个服务端的响应,用postman 查看响应头我们设置的内容
  3. 体验到 netty强大的封装带给我们的便利性

2、WebSocket

websocket是由浏览器发起的

协议标识符 http://127.0.0.1:8080 ws://127.0.0.1:7777

GET ws://127.0.0.1:7777 HTTP/1.1
Host: 127.0.0.1
Upgrade: websocket    # 升级为ws   
Connection: Upgrade   # 此链接需要升级
Sec-WebSocket-key: client-random-string ...  # 标识加密相关信息
HTTP/1.1 101
Upgrade: websocket
Connection: Upgrade

响应码 101 代表本次协议需要更改为websocket

连接建立后,支持文本信息及二进制信息。

Websocket实现的原理:
通过http协议进行连接的建立(握手和回答),建立连接后不再使用http,而tcp自身是支持双向通信的,所以能达到“全双工”的效果。

通信使用的单位叫帧 frame
客户端:发送时将消息切割成多个帧
服务端:接收时,将关联的帧重新组装

【客户端】

var ws = new WebSocket("ws://127.0.0.1:7777/hello");
ws.onopen = function(ev){
     ws.send("hello"); //建立连接后发送数据
}

设计一个样式
左右两个各有一个文本框,中间放一个发送按钮。
左侧文本框用来发送数据,右侧文本框用来显示数据。

Websocket 应用demo

服务端代码

public class WebSocketServer {
    public static void main(String[] args) {
        //可以自定义线程的数量
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 默认创建的线程数量 = CPU 处理器数量 *2
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler())
                //当前连接被阻塞的时候,BACKLOG代表的事 阻塞队列的长度
                .option(ChannelOption.SO_BACKLOG, 128)
                //设置连接保持为活动状态
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(new WebSocketInitialzer());

        try {
            ChannelFuture future = serverBootstrap.bind(7777).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

}

服务端初始化器

public class WebSocketInitialzer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //增加编解码器 的另一种方式
        pipeline.addLast(new HttpServerCodec());
        //    块方式写的处理器 适合处理大数据
        pipeline.addLast(new ChunkedWriteHandler());
        //聚合
        pipeline.addLast(new HttpObjectAggregator(512 * 1024));
        /*
         * 这个时候 我们需要声明我们使用的是 websocket 协议
         * netty为websocket也准备了对应处理器  设置的是访问路径
         * 这个时候我们只需要访问 ws://127.0.0.1:7777/hello 就可以了
         * 这个handler是将http协议升级为websocket  并且使用 101 作为响应码
         * */
        pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
        pipeline.addLast(new WebSocketHandler());
    }
}

服务端处理器

通信使用的单位叫帧 frame
客户端:发送时将消息切割成多个帧
服务端:接收时,将关联的帧重新组装

/*
 * 泛型 代表的是处理数据的单位
 * TextWebSocketFrame : 文本信息帧
 * */
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
        //可以直接调用text 拿到文本信息帧中的信息
        System.out.println("msg:" + textWebSocketFrame.text());
        Channel channel = ctx.channel();
        //我们可以使用新建一个对象 将服务端需要返回的信息放入其中 返回即可
        TextWebSocketFrame resp = new TextWebSocketFrame("hello client from websocket server");
        channel.writeAndFlush(resp);
    }
}

websocket前端编写

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<script>
    var socket;
    //    判断当前浏览器是否支持websocket
    if (!window.WebSocket) {
        alert("不支持websocket")
    } else {
        <!-- 创建websocket 连接对象-->
        socket = new WebSocket("ws://127.0.0.1:7777/hello");
        //设置开始连接的方法
        socket.onopen = function (ev) {
            var tmp = document.getElementById("respText");
            tmp.value = "连接已经开启";
        }
        //设置关闭连接的方法
        socket.onclose = function (ev) {
            var tmp = document.getElementById("respText");
            tmp.value = tmp.value + "\n" + "连接已经关闭";
        }
        //设置接收数据的方法
        socket.onmessage = function (ev) {
            var tmp = document.getElementById("respText");
            tmp.value = tmp.value + "\n" + ev.data;
        }
    }

    function send(message) {
        if (!window.socket) {
            return
        }
        /*
        * 判断socket的状态
        * connecting 正在连接 closing 正在关闭
        * closed 已经关闭或打开连接失败
        * open 连接可以 已经正常通信
        * */
        if (socket.readyState == WebSocket.OPEN) {
            socket.send(message);
        } else {
            alert("连接未开启");
        }
    }
</script>
<!--防止表单自动提交-->
<form onsubmit="return false">
    <textarea name="message" style="height: 400px;width: 400px"></textarea>
    <input type="button" value="发送" onclick="send(this.form.message.value)">
    <textarea id="respText" style="height: 400px;width: 400px"></textarea>
</form>
</body>
</html>

【客户端】

var ws = new WebSocket("ws://127.0.0.1:7777/hello");
ws.onopen = function(ev){
     ws.send("hello"); //建立连接后发送数据
}

设计一个样式
左右两个各有一个文本框,中间放一个发送按钮。
左侧文本框用来发送数据,右侧文本框用来显示数据。

演示效果

启动服务发送消息

小结

  1. websocket 一般用于做可复用连接,http一般做短链接
  2. websocket解决了http连接只能客户端发起的

(三)原理篇

1、ByteBuf

NIO中ByteBuffer的缺点:

A 长度固定,无法动态的扩容和缩容,缺乏灵活性
B 使用一个position记录读写的索引位置,在读写模式切换时需手动调用flip方法,增加了使用的复杂度。
C 功能有限,使用过程中往往需要自行封装

1)分类

按照内存的位置,分为堆内存缓冲区 heap buffer、直接内存缓冲区direct buffer、复合内存缓冲区composite buffer。

A heap buffer

将数据存储到JVM的堆空间中,实际使用字节数组byte[]来存放。
优点:数据可以快速的创建和释放,并且能够直接访问内部数组
缺点:在读写数据时,需要将数据复制到直接缓冲区 再进行网络传输。

B direct buffer

不在堆中,而是使用了操作系统的本地内存。
优点:在使用Socket进行数据传输过程中,减少一次拷贝,性能更高。
缺点:释放和分配的空间更昂贵,使用时需要更谨慎。

C composite buffer

将两个或多个不同内存的缓冲区合并
优点:可以统一进行操作

应用场景:在通信线程使用缓冲区时,往往使用direct buffer,而业务消息使用缓冲区时,往往使用heap buffer,在解决http包,请求头+请求体特性不同而选择不同位置存储时,可以将两者拼接使用

D 池化的概念

对于内存空间分配和释放的复杂度和效率,netty通过内存池的方式来解决。
内存池,可以循环利用ByteBuf,提高使用率。但是管理和维护较复杂。

Unpooled正是非池化缓冲区的工具类。

主要区别在于,池化的内存由netty管理,非池化的内存由GC回收。

E 回收方式

回收方式为引用计数,具体规则为,通过记录被引用的次数,判断当前对象是否还会被使用。
当对象被调用时,引用计为+1,当对象被释放时,引用计为-1,当引用次数为0时,对象可以回收。

弊端:可能引发内存泄漏。
当对象不可达,JVM会通过GC回收掉,但此时引用计数可能不为0,对象无法归还内存池,会导致内存泄漏。netty只能通过对内存缓冲区进行采样,来检查。

2)工作原理

和ByteBuffer不同在于,增加了一个指针,通过两个指针记录读模式和写模式时的索引位置,读指针叫做readerIndex,写指针叫做writerIndex。

A 读写分离

image-20220412193227329

image-20220412193237587

当执行clear()方法时,索引位置清空回初始位置,但数据保持不变。

mark和reset方法在ByteBuf中同样适用,如markReaderIndex和resetReaderIndex。

B 深浅拷贝

浅拷贝,拷贝的是对对象的引用,并没有创建新对象,新对象和原对象之间互相影响。
深拷贝,拷贝的是整个对象,和原对象之间完全独立。

duplicate和slice方法,达成全部浅拷贝和部分浅拷贝。
copy,部分深拷贝,部分代表的是可读空间。

3)扩容机制

A ByteBuffer的存储

ByteBuffer在put数据时,会校验剩余空间是否不足,如果不足,会抛出异常。

ByteBuffer buffer = ByteBuffer.allocate(8);
buffer.put("yu".getBytes());

----------------------------------------------------

    public final ByteBuffer put(byte[] src) {
        return put(src, 0, src.length);
    }
    
    // 额外接收偏移量(存储数据的起始位置)  和数据长度
    public ByteBuffer put(byte[] src, int offset, int length) {
        // 校验参数的有效性
        checkBounds(offset, length, src.length);
        // 如果要存储数据的长度 > 剩余可用空间  抛出buffer越界的异常
        if (length > remaining())
            throw new BufferOverflowException();
        // 如果剩余空间足够  计算存储的结束位置 = 偏移量 + 数据长度    
        int end = offset + length;
        for (int i = offset; i < end; i++)
            this.put(src[i]);
        return this;
    }    

如果要手动对ByteBuffer扩容,可以在put之前,先校验剩余空间是否足够,如果不足够,创建一个新的ByteBuffer,新的容量确保足够,旧的buffer数据拷贝到新的buffer中,然后继续存储数据。

B ByteBuf的存储和扩容

当写数据时,先判断是否需要扩容,如果当前空间较小(<4M),以64作为基数倍增(10 -> 64 -> 128 -> 256), 如果当前空间较大(>4M), 每次扩容都增加4M,这种方式叫做"步进式"。

查看源码,以AbstractByteBuf子类为依据查看,最重要的子类之一,ByteBuf的公共属性和功能都在此中实现。

ByteBuf buf = Unpooled.buffer(10);
System.out.println("capacity: " + buf.capacity());
for (int i = 0; i < 11; i++) {
    buf.writeByte(i);
}
----------------------------------------------------   

[ByteBuf类]
public abstract ByteBuf writeByte(int value);

按住Ctrl+Alt快捷键

[AbstractByteBuf子类]
---------------------------------------------------- 
    @Override
    public ByteBuf writeByte(int value) {
        // 确保可写空间足够
        ensureWritable0(1);
        // 写入数据
        _setByte(writerIndex++, value);
        return this;
    }
    
    // 参数为 最小写入数据的大小
    final void ensureWritable0(int minWritableBytes) {
        final int writerIndex = writerIndex();
        // 目标容量 = 当前写操作索引 + 最小写入数据大小
        final int targetCapacity = writerIndex + minWritableBytes;
        // 容量足够  不需扩容
        if (targetCapacity <= capacity()) {
            ensureAccessible();
            return;
        }
        // 容量不足时 如果目标容量 超出最大容量  抛出异常
        if (checkBounds && targetCapacity > maxCapacity) {
            ensureAccessible();
            throw new IndexOutOfBoundsException(String.format(
                    "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                    writerIndex, minWritableBytes, maxCapacity, this));
        }
		
		// 扩容逻辑
        // 获取可写空间大小
        final int fastWritable = maxFastWritableBytes();
        // 如果 可写空间 >= 所需空间   新的容量=写操作索引+可写空间大小 
        // 如果 可写空间 < 所需空间   计算要扩容的新容量大小 calculateNewCapacity方法
        int newCapacity = fastWritable >= minWritableBytes ? writerIndex + fastWritable
                : alloc().calculateNewCapacity(targetCapacity, maxCapacity);

        // Adjust to the new capacity.
        
        // 计算完成后 生成新的ByteBuffer
        capacity(newCapacity);
    }
    
    // 获取可写空间大小
    public int maxFastWritableBytes() {
        return writableBytes();
    }
    

[AbstractByteBufAllocator子类]
---------------------------------------------------- 
    // 计算要扩容的新容量大小
    @Override
    public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
        // 校验参数有效性
        checkPositiveOrZero(minNewCapacity, "minNewCapacity");
        if (minNewCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                    minNewCapacity, maxCapacity));
        }
        // 扩容方式的分界点 以4M大小为界
        final int threshold = CALCULATE_THRESHOLD; // 4 MiB page

        if (minNewCapacity == threshold) {
            return threshold;
        }

        // If over threshold, do not double but just increase by threshold.、
        // 如果所需容量大于4M  按照步进的方式扩容 
        //   举例: 比如 minNewCapacity = 5M  
        if (minNewCapacity > threshold) {
            // newCapacity = 5 / 4 * 4 = 4M  确保是4的倍数
            int newCapacity = minNewCapacity / threshold * threshold;
            if (newCapacity > maxCapacity - threshold) {
                newCapacity = maxCapacity;
            } else {
                // newCapacity = 4 + 4 = 8M;
                newCapacity += threshold;
            }
            return newCapacity;
        }

        // Not over threshold. Double up to 4 MiB, starting from 64.
        // 如果所需容量大于4M  按照64的倍数扩容  找到最接近所需容量的64的倍数
        int newCapacity = 64;
        while (newCapacity < minNewCapacity) {
            newCapacity <<= 1;
        }
        
        // 保障在最大可接受容量范围内
        return Math.min(newCapacity, maxCapacity);
    }

4)优势

A 池化的方式提高内存使用率

B 提出了复合型缓冲区的整合方案

C 增加了索引,使读写分离,使用更便捷

D 解决了ByteBuffer长度固定的问题,增加了扩容机制

E 用引用计数的方式进行对象回收

2、Channel

1)Channel

channel是通讯的载体,对应通讯的一端,在BIO中对应Socket,NIO中对应SocketChannel,Netty中对应NioSocketChannel,ServerSocket同理。
channelhandler是通道的处理器,一个channel往往有多个handler
channelpipeline是handler的容器,装载并管理handler的顺序(本质是双向链表)

image-20220412193249568

如图,channel创建时,会对应创建一个channelpipeline,pipeline首先会记录一个头部的处理器handler,当pipeline进行分发时,先分发给头部,然后依次执行,执行handler全部执行完成。

同时,channel创建后,会注册进EventLoop之中,EventLoop会监听事件的发生。不同的事件调用handler不同的处理方法,让流程运转起来。

channel生命周期,对应四种状态,分别为:
A) ChannelUnregistered 已创建但还未被注册到监听器中
B) ChannelRegistered 已注册到监听器EventLoop中
C) ChannelActive 连接完成处于活跃状态,此时可以接收和发送数据
D) ChannelInactive 非活跃状态,代表连接未建立或者已断开

channelhandler生命周期,对应三种状态,分别为:
A) handlerAdded 把handler添加到pipeline之中
B) handlerRemoved 从pipeline中移除
C) exceptionCaught 在处理过程中有错误产生

创建channel源码分析

以服务端启动为例
ChannelFuture future = serverBootstrap.bind(8888).sync();

参数设置
serverBootstrap.channel(NioServerSocketChannel.class)

【AbstractBootstrap】 启动对象的父类
------------------------------------------------------------------
    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }
    
    public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
    }
    
    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        .......
    }
    
    
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } 
        .......
    }
   
   
 【ReflectiveChannelFactory】  工厂实现类
 ------------------------------------------------------------------   
    public T newChannel() {
        try {
            return constructor.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
        }
    }

在启动对象调用bind()或connect()方法时,会创建channel
本质上通过反射,使用工厂的反射实现类创建对应的实例,此时实例对象的类型是通过channel参数来设置的

2)ChannelHandler

类层次关系图

image-20220412193256540

入站和出站:
从服务端的角度,数据从客户端发送到服务端,称之为入站,当数据处理完成返回给客户端,称之为出站。是相对的概念。

从客户端的角度,数据从服务端发送给客户端,称之为入站,当数据返回给服务端,称之为出站。

不论是入站还是出站,handler从一端开始,到另一端结束,以责任链的模式依次执行。

责任链模式——"击鼓传花",当请求被不同的接收者处理时,每个接收者都包含对下一个接收者的引用,一个接收者处理完成后,将依次向下传递。

适配器模式——出国时要使用的电源转换器(美国/日本110V 中国220V电压),作为两个不兼容的接口之间的桥梁,将类的接口转换为需要的另外一种接口。

ChannelDuplexHandler是除了入站和出站handler之外的,另一个常用子类。
它同时实现了ChannelInboundHandler和ChannelOutboundHandler接口,如果需要既处理入站事件又处理出站事件,可以继承此类。

 serverBootstrap.handler(new LoggingHandler(LogLevel.INFO))
 
 ------------------------------------------------------------------
 public class LoggingHandler extends ChannelDuplexHandler{}
 
 ------------------------------------------------------------------
 public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {}
 
  ------------------------------------------------------------------
  public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {}

ChannelHandlerAdapter
提供了额外的isSharable()方法,用来判断handler是否可以被共享到多个pipeline之中。默认情况不共享,如果需要共享,在继承了适配器的handler上,增加注解@Sharable

@Sharable
public class LoggingHandler extends ChannelDuplexHandler {}

ChannelInboundHandler
最重要的方法是channelRead(),在使用时,需要显式的释放ByteBuf相关的内存。使用ReferenceCountUtil是引用计数的工具类。

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // netty中的缓冲区  叫做ByteBuf  -- 对ByteBuffer的封装
        ByteBuf buf = (ByteBuf) msg;
        // 释放ByteBuf内存
        ReferenceCountUtil.release(msg);
    }

为了减少对资源内存的管理,使用SimpleChannelInboundHandler,使用其channelRead0()方法,可以自动释放资源,使用更便利。

SimpleChannelInboundHandler源码
------------------------------------------------
@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;
        try {
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I imsg = (I) msg;
                channelRead0(ctx, imsg);
            } else {
                release = false;
                ctx.fireChannelRead(msg);
            }
        } finally {
            if (autoRelease && release) {
                ReferenceCountUtil.release(msg);
            }
        }
    }

3)ChannelPipeline

pipeline中维护入站和出站链路,两条链路的执行顺序。

handler只负责处理自身的业务逻辑,对通道而言,它是无状态的。通道的信息会保存到handlerContext处理器上下文中,它是连接pipeline和handler之间的中间角色。

pipeline管理的是由handlerContext包裹的handler,也就是说,当添加handler时,先将其转为handlerContext,然后添加到pipeline的双向链表中。头结点叫做HeadContext,尾节点叫做TailContext。

image-20220412193303101

 ch.pipeline().addLast(new NettyServerHandler());
 
 [DefaultChannelPipeline]
 ----------------------------------------------------------------
    public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }
    
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        ObjectUtil.checkNotNull(handlers, "handlers");

        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }

        return this;
    }
    
    
    // 关键逻辑
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            // 检查当前handler是否支持共享,如果不支持,又被添加到其他pipeline中,会报错
            checkMultiplicity(handler);
		    // 将handler封装为context
            newCtx = newContext(group, filterName(name, handler), handler);
            // 将context添加到链表尾部
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventLoop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            // 判断当前通道的注册状态,如果是未注册,执行此逻辑
            if (!registered) {
                // 添加一个任务,当通道被注册后,能够回调handlerAdded方法
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }
            // 如果已被注册  执行调用handlerAdded方法
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
    
    
    
    private static void checkMultiplicity(ChannelHandler handler) {
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            if (!h.isSharable() && h.added) {
                throw new ChannelPipelineException(
                        h.getClass().getName() +
                        " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            h.added = true;
        }
    }
    
    
    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }
    
    // 尾节点会提前声明并创建
    final AbstractChannelHandlerContext tail;
    
    //  prev -> tail   在其中插入newctx
    //  prev -> newctx -> tail   放到倒数第二个位置中  tail节点是保持不变的
    //  依次更改 新节点的前后指针   以及prev节点的后置指针和tail节点的前置指针
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
    
    // 构造器中已经提前创建了头尾节点
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }
    

使用pipeline模式的优点:
A) 解耦,让处理器逻辑独立,可以被多个channel共享
B) channel相关信息,交给context维护
C) 具有极大的灵活性,使用处理器可以方便的添加或删除,或者更改它们的顺序

3、EventLoop

EventLoop事件循环,监听IO事件,内部封装了线程

EventLoopGroup事件循环组,是对EventLoop的管理,封装了线程池。

当新建channel时,group会为其分配一个EventLoop,封装了nio中的Selector,监听通道中的所有事件,一个通道的生命周期内,所有操作都由相同的EventLoop所封装的线程处理。

同时,多个通道可以由一个EventLoop处理,是多对一的关系

1)EventLoopGroup

类层级结构
(通过选中NioEventLoopGroup源码 - 右键 - 选中Diagrams - 选中show diagram 展示出来)

image-20220412193309395

A 初始化流程
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();


[NioEventLoopGroup]
-------------------------------------------------------------------------
    public NioEventLoopGroup() {
        this(0);
    }
    
    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }
    
    public NioEventLoopGroup(int nThreads, Executor executor) {
        this(nThreads, executor, SelectorProvider.provider());
    }
    
    // 逐步增加参数
    public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }
    
    // 调用父类的构造器
    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }
    
    
 [MultithreadEventLoopGroup]
------------------------------------------------------------------------- 
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    
    // 初始化线程数量的逻辑     线程数 = cpu核数 * 2
    private static final int DEFAULT_EVENT_LOOP_THREADS;
    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }
    
    
[MultithreadEventExecutorGroup]    
------------------------------------------------------------------------- 
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }
    
    // 核心逻辑
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                // 根据线程数量  创建EventLoop的逻辑  
                // newChild()的具体实现在NioEventLoopGroup类中
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }
    
    
[NioEventLoopGroup]    
-------------------------------------------------------------------------     
     @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
    }   
    
    

2)EventLoop

重要属性为 Selector 及其父类的父类中的 Thread
Selector用于在channel创建之后,注册其中监听后续的I/O事件
Thread用于进行轮询,在channel注册之后启动线程

image-20220412193320506

A 注册channel
 ChannelFuture future = serverBootstrap.bind(8888).sync();
 
 [AbstractBootstrap]
 --------------------------------------------------------------------------
    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }
    
    
    public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
    }
    
    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        .....
    }
    
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                channel.unsafe().closeForcibly();
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }  

注册的源码调用链路

ChannelFuture regFuture = config().group().register(channel);

[MultithreadEventLoopGroup]
------------------------------------------------------------------------------
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
    
[SingleThreadEventLoop]    
------------------------------------------------------------------------------
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }
    
    
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
  
 
[AbstractChannel]
------------------------------------------------------------------------------
//  核心逻辑是 调用了 register0()方法
 public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ObjectUtil.checkNotNull(eventLoop, "eventLoop");
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }
				
			// channel中存在EventLoop类型的属性
            // 通道初始化时,会将指定的EventLoop与channel进行关联
            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }
        
        
     // 核心逻辑是调用了doRegister()方法
     private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                doRegister();
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }        
  

[AbstractNioChannel]
----------------------------------------------------------------------------
// 核心注册逻辑
protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 将通道注册进Selector中 此时感兴趣的事件是0
                // 并且将当前对象作为附加对象存入其中  等价selectionKey.attach()方法
                // 使用对象时   再通过selectionkey.attachment()方法取出对象
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }
    
B 轮询事件的状态

image-20220412193329743

读取源码的思路:找到入口,找到核心逻辑。

AbstractChannel中register()方法,对eventLoop.execute()的调用,就是启动线程进行轮询的入口。

[SingleThreadEventExecutor]
-----------------------------------------------------------------------------
    public void execute(Runnable task) {
        ObjectUtil.checkNotNull(task, "task");
        execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
    }
    
    private final Queue<Runnable> taskQueue;
    //  当前类维护了一个任务队列
    private void execute(Runnable task, boolean immediate) {
        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            // 启动线程
            startThread();
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                    // The task queue does not support removal so the best thing we can do is to just move on and
                    // hope we will be able to pick-up the task before its completely terminated.
                    // In worst case we will log on termination.
                }
                if (reject) {
                    reject();
                }
            }
        }

        if (!addTaskWakesUp && immediate) {
            wakeup(inEventLoop);
        }
    }
    
    
    private void startThread() {
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                boolean success = false;
                try {
                    doStartThread();
                    success = true;
                } finally {
                    if (!success) {
                        STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    }
                }
            }
        }
    }
    
    
    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    // 真正的调用逻辑
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } 
                
                ......
            }
         }
     }

[NioEventLoop]
-----------------------------------------------------------------------------
protected void run() {
        int selectCnt = 0;
        for (;;) {
            try {
                int strategy;
                try {
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO

                    case SelectStrategy.SELECT: 
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            // 判断队列中是否存在任务
                            if (!hasTasks()) { 
                                // 如果不存在  调用select()进行获取
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                    // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                    // the selector and retry. https://github.com/netty/netty/issues/8566
                    rebuildSelector0();
                    selectCnt = 0;
                    handleLoopException(e);
                    continue;
                }

                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) {
                            // 处理任务的核心逻辑
                            processSelectedKeys();
                        }
                    } finally {
                        // Ensure we always run tasks.
                        ranTasks = runAllTasks();
                    }
                } else if (strategy > 0) {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else {
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                }

                if (ranTasks || strategy > 0) {
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                    selectCnt = 0;
                }
            } catch (CancelledKeyException e) {
                // Harmless exception - log anyway
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                            selector, e);
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

processSelectedKeys()是处理任务的核心逻辑,来自于NioEventLoop的run()方法调用

    // 处理事件集合
    private void processSelectedKeys() {
        // 如果selectedKeys(事件集合)没有值,重新获取,如果有值,直接处理
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
    
    
     private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        // check if the set is empty and if so just return to not create garbage by
        // creating a new Iterator every time even if there is nothing to process.
        // See https://github.com/netty/netty/issues/597
        if (selectedKeys.isEmpty()) {
            return;
        }

        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            // 这是注册时,存储的附加对象,即为通道对象channel
            final Object a = k.attachment();
            i.remove();

            if (a instanceof AbstractNioChannel) {
                // 处理具体事件
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (!i.hasNext()) {
                break;
            }

            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();

                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else {
                    i = selectedKeys.iterator();
                }
            }
        }
    }
    
    // 处理具体事件
    //  判断事件类型,调用对应的逻辑处理
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop == this) {
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
            }
            return;
        }

        try {
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

其中读事件的处理

unsafe.read();

[AbstractNioByteChannel]
----------------------------------------------------------------------------
public final void read() {
            final ChannelConfig config = config();
            if (shouldBreakReadReady(config)) {
                clearReadPending();
                return;
            }
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            // There is nothing left to read as we received an EOF.
                            readPending = false;
                        }
                        break;
                    }

                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                } while (allocHandle.continueReading());

                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }

4、Bootstrap

引导,对应用程序进行配置,并让他运行起来的过程。

1)配置

必选参数:group 、 channel、 handler(服务端 -- childHandler)

group(): 指定一个到两个reactor

channel():指定channel工厂,反射的方式创建channel使用

handler():指定reactor的处理器,其中childHandler指定的是,服务端所接收到的客户端channel使用的处理器,而服务端的主reactor(bossGroup),已经默认添加了acceptor处理器,所以可以不指定。

option():指定TCP相关的参数,以及netty自定义的参数

配置参数的过程,称之为初始化。

2)运行

        // 启动并设置端口号  但需要使用sync异步启动
        try {
            ChannelFuture future = serverBootstrap.bind(8888).sync();
            // 将关闭通道的方式 也设置为异步的
            //    阻塞finally中的代码执行
            future.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 优雅关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

bind(),将服务端的channel绑定到端口号,然后接收客户端的连接,让整个netty运行起来。

sync(),因为绑定事件是异步的,所以使用sync同步等待结果,换句话说,bind只触发了绑定端口的事件,需要使用sync等待事件执行的结果。

future.channel().closeFuture().sync(),含义为,当通道被关闭时,才执行后续的操作,sync使当前线程执行到此处阻塞,以确保不执行后续的shutdown方法。

3)源码解析

A 类声明
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

嵌套的泛型使用,可以达到,子类中返回子类本身的效果,具体如下:

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
B init方法

工作内容

a、设置channel相关的选项参数
b、设置channel的属性键值对
c、添加对channel的IO事件处理器 (Acceptor角色)

void init(Channel channel) {
        // 设置channel相关的选项参数
        setChannelOptions(channel, newOptionsArray(), logger);
        // 设置channel的属性键值对
        setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));

        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
        }
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                
                // 添加对channel的IO事件处理器 (Acceptor角色)
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

Acceptor分析

功能:将主reactor接收到的客户端通道,传递给从reactor

// ServerBootstrapAcceptor是ServerBootstrap的静态内部类
// netty将acceptor看作一个处理器,并且是入站处理器
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

// 具体的逻辑封装到channelRead()方法中
// 对客户端通道进行配置 , 然后注册到从Reactor中
public void channelRead(ChannelHandlerContext ctx, Object msg) {
            // 此时msg对应 服务端接收到的客户端通道
            final Channel child = (Channel) msg;
		    // 设置处理链路
            child.pipeline().addLast(childHandler);
			// 设置通道的配置项和参数
            setChannelOptions(child, childOptions, logger);
            setAttributes(child, childAttrs);

            try {
                // childGroup 是从reactor的资源池 调用注册方法 注册客户端通道child
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {					
                        // 增加监听 获取注册的异步结果
                        // 如果注册失败 或者抛出异常  都会关闭channel
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
        
        
        // 如果处理客户端连接失败了,暂停一秒,然后继续接受
        // 为保障服务端能够尽可能多的处理客户端的连接  不受某一次处理失败的结果影响
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            final ChannelConfig config = ctx.channel().config();
            if (config.isAutoRead()) {
                // stop accept new connections for 1 second to allow the channel to recover
                // See https://github.com/netty/netty/issues/1328
                config.setAutoRead(false);
                ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
            }
            // still let the exceptionCaught event flow through the pipeline to give the user
            // a chance to do something with it
            ctx.fireExceptionCaught(cause);
        }

4) Future和Promise

Future代表的是,一个还未完成的异步任务的执行结果。可以通过addListener方法,监听执行结果后进行相应的处理,此时它的状态可以分为未完成、已完成(成功、失败、主动取消)等。

对Future而言,状态只能读取,无法更改,又出现了Promise,但是Promise只能更改一次。

参照生活中的定额发票(Future)和机打发票(Promise)。

(四)拓展篇

1、心跳检测

检测逻辑:
1) 服务端启动,客户端建立连接,连接的目的是互相发送消息。
2) 如果客户端在工作,服务端一定能收到数据,如果客户端空闲,服务端会出现资源浪费。
3) 服务端需要一种检测机制,验证客户端的活跃状态,不活跃则关闭。

需求设计:
1) 客户端向服务端发送 “I am alive” , sleep一个随机时间,模拟空闲状态
2) 服务端收到消息后,返回“over”, 客户端有空闲,记录空闲次数
3) 设定阈值,达到阈值时主动关闭连接

在这里插入图片描述

IdleStateHandler , 是netty提供的处理器
1)超过多长时间没有读 readerIdleTime

  1. 超过多长时间没有写 writerIdleTime
  2. 超过多长时间没有读和写 allIdleTime

底层实现检测的是 IdleStateEvent事件,通过管道传递给下一个handler处理,处理方法是userEventTriggered。

其中IdleStateEvent事件,分为READER_IDLE、WRITER_IDLE、ALL_IDLE三大类

2、TCP粘包拆包

TCP是基于流的。

当客户端发送多个包,服务端只收到一个包,此时发生了粘包。
当客户端发送多个包,服务端也接收到多个包,但是包是不完整,多了或少了数据,此时发生了拆包。

UDP会发生拆包和粘包吗?
不会,UDP是基于报文的,在UDP的首部使用16bit存储报文的长度,因此不会发生。

TCP发生粘包和拆包的本质原因:
要发送的数据先经过TCP的缓冲区,还限制了最大报文长度。
A 如果要发送的数据 > TCP剩余的缓冲区大小,发生拆包
B 如果要发送的数据 > 最大报文长度,发生拆包
C 如果要发送的数据 << TCP剩余的缓冲区大小,发生粘包
D 接收数据的应用层,没有及时读取缓冲区数据,也会发生粘包

解决办法:
A 设置出消息的长度
B 设置出消息的边界——分隔符

Netty提供的解码器,两类

A 基于长度的解码器,在包头部设置出数据的长度。(类似于UDP的头部处理)
LengthFieldBasedFrameDecoder 自定义长度的处理方式
FixedLengthFrameDecoder 固定长度的处理方式

B 基于分隔符的解码器
DelimiterBasedFrameDecoder 自定义分隔符的处理方式
LineBasedFrameDecoder 行尾("\n"或"\r\n")分隔符的处理方式

【Demo逻辑】
需求:客户端循环100次向服务端请求时间

1)第一种方式,传输的过程数据单位是字节流ByteBuf,需要自行处理分隔符以及数据的长度,此时会出现粘包和拆包的问题

2)第二种方式,使用LineBasedFrameDecoder,配合StringDecoder使用,传输的数据单位变成字符串,可以直接处理,保证业务逻辑上的包和真正传输的包是一致的

3、序列化框架——protobuf

protobuf = protocol buffers
类似于xml的生成和解析,但效率更高,生成的是字节码,可读性稍差。

【Demo逻辑】

1)安装idea插件,protobuf support

​ 如果安装之后,创建*.proto文件没有使用插件,手动设置关联关系

​ settings -> file types -> 找到protobuf -> 增加正则表达式

2)引入maven依赖和插件

<properties>
    <os.detected.classifier>windows-x86_64</os.detected.classifier>
</properties>

<build>
        <plugins>
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.5.0</version>
                <configuration>
                    <protocArtifact>
                        com.google.protobuf:protoc:3.1.0:exe:${os.detected.classifier}
                    </protocArtifact>
                    <pluginId>grpc-java</pluginId>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

3)在右侧maven project中可以找到相应的插件 (没有的话刷新)
image-20220412193415944

4)在和java平级的目录下,创建proto文件夹,然后创建person.proto文件
5)person.proto

// 声明包名称的空间
syntax="proto3";
// 具体的类生成目录
option java_package="com.duing";
// 具体的类名
option java_outer_classname="PersonModel";

// 类结构
message Person{
    int32 id = 1;    // 此处的1代表顺序
    string name = 2;
}
  1. 使用插件进行编译,将编译生成的代码拷贝到需要的目录下
    7)编写测例进行序列化和反序列化操作
分类:
标签: Netty