Netty网络框架

Netty网络框架
(一) 基础篇
1、I/O基础
输入流:InputStream和Reader
输出流:OutputStream和Writer
字节流 字符流
计算机最小的二进制单位 bit 比特 代表0和1
字节 1 byte = 8bit 计算机处理的最小单位
字符 1 char = 2byte = 16bit 人处理的最小单位
所以,字节流处理文件、图片、视频等二进制数据,而字符流处理文本数据。
2、Socket
原意是“插座”,在计算机领域中,翻译为“套接字”。
本质上,是计算机之间进行通信的一种方式。
Linux,“一切皆文件”,给每个文件映射一个ID,叫做"文件描述符"。
当处理网络连接时,也会看成一个文件,read/write变成和远程计算机的交互。
OSI七层模型 = Open System Interconnection 开放式系统互联
从下到上分别为:物理层、数据链路层、网络层、传输层、会话层、表示层和应用层。
实际应用的是优化后的TCP/IP模型(四层)
网络接口层/链路层、网络层、传输层、应用层
应用层协议:HTTP、FTP、SMTP(邮件协议)
传输层协议:TCP、UDP
Socket其实是应用层与传输层之间的抽象层,是一组接口。
在设计模式中,是门面模式。
3、NIO
BIO - BlockingIO 同步阻塞
NIO - New IO / Non-Blocking IO 同步非阻塞
AIO - Asynchronous IO 异步非阻塞
同步和异步,关注的是消息通知的机制
阻塞和非阻塞,关注的是等待消息过程中的状态
多路复用的模型
三大元素:Channel 、Buffer、Selector
1) Channel
FileChannel 文件管道的数据
Pipe.SinkChannel
Pipe.SourceChannel 线程间通信的管道
ServerSocketChannel
SocketChannel 用于TCP网络通信的管道
DatagramChannel 用于UDP网络通信的管道
2) Buffer
capacity 总体容量大小
limit 存储容量的大小,是可读写和不可读写的界线
position 已读容量的大小,已读和未读区域的界线
【使用原理】
a) 初始化,给定总容量,position=0, limit=capacity
b) 当使用put方法存入数据是,通过position来记录存储的容量变化,position不断后移,直到存储结束(写完成)
c)写完成需要调用flip方法刷新,limit=position,position=0
保障limit记录的是可读写区域的大小,position已读部分重置为空
d) 读数据直到读完成,需要调用clear方法,position=0, limit=capacity
3) Selector
三个元素: Selector选择器、SelectableChannel可选择的通道、SelectionKey选择键
本质上,Selector是监听器,监听的是通道是否有我们关心的操作产生,操作对应的是事件(连接、接收、读/写),使用SelectionKey代表具体的事件,在确保通道是可选择的情况下,将通道注册进选择器中,此时Selector维护的是,通道和事件之间的关联关系。
Selector,管理被注册的通道集合,以及他们的状态
SelectableChannel,是一个抽象类,提供了通道可被选择需要实现的api。
FileChannel就不是可选择的,Socket相关的通道都是可选择的
一个通道可以被注册到多个选择器上吗? 可以的
多个通道可以注册到一个选择器上,但一个通道只能在一个选择器中注册一次
SelectionKey,封装了要监听的事件,连接、接收、读、写。
一方面,Selector关心通道要处理哪些事件
另一方面,当事件触发时,通道要处理哪些事件
【使用方式】
a、首先通过open方法,获取通道,将通道设置为非阻塞的
b、通过open方法,获取选择器,将通道注册进选择器中,伴随设置通道要处理的事件(OP_ACCEPT)
c、轮询选择器,当前是否有要处理的操作 select() > 0?
如果有,要获取,待处理操作的集合Set<SelectionKey> , 进行遍历
遍历到SelectionKey时,判断对应哪种操作,不同的操作设置不同的处理方式
如OP_ACCEPT,接收客户端通道并进行注册,监听后续处理的事件,如OP_WRITE
如OP_WRITE,通过key的方法获取通道本身,读取数据并继续监听事件,如OP_READ
4、零拷贝
需求:将磁盘中的文件读取出来,通过socket发送出去
传统的拷贝方式(4次)
Socket网络缓冲区,也属于操作系统的内核缓冲区。
在操作系统中进行的拷贝(如第二次和第三次),叫做CPU拷贝。
连接磁盘或网卡等硬件的拷贝(如第一次和第四次),叫做DMA拷贝。
零拷贝的概念:减少CPU拷贝的次数。
零拷贝是基于操作系统层面的优化方式(以下基于Linux系统)
1) mmap = memory mapping 内存映射
2)sendfile (linux2.1内核支持)
- sendfile with scatter/gather copy(批量sendfile)
从单个文件的处理,上升到多个物理地址的处理,提高处理速度
4)splice (拼接,在linux2.6内核支持)
在操作系统内核缓冲区和Socket网络缓冲区之间建立管道,来减少拷贝次数。
线程模型
1) 单线程Reactor模型
顾名思义 就是使用一个线程来处理问题 线程中
- selector
- 事件处理 : 连接事件
- 处理事件:handler
单线程服务器
public class ReactorServer {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
public ReactorServer() {
try {
// 初始化监听器 与 channel 通道
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
// 配置为非阻塞的
serverSocketChannel.configureBlocking(false);
// 配置通道连接地址 开放 9090 端口
SocketAddress address = new InetSocketAddress(9090);
serverSocketChannel.socket().bind(address);
//将channel 注册到 selector监听通道事件 达到多路复用
//首个注册事件一般都是 accept 连接事件
SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 创建处理连接事件的 acceptor
// 同时 创建处理器 接受后序的IO读写事件,不断的遍历 是否有事情发生
Acceptor acceptor = new Acceptor(selector, serverSocketChannel);
//附加一个对象 用来处理事件
key.attach(acceptor);
while (true) {
//返回事件的个数 处理事件
int num = selector.select();
if (num == 0) {
continue;
}
//没有跳过就代表有事件需要处理,拿到事件集合
Set<SelectionKey> SKeyset = selector.selectedKeys();
Iterator<SelectionKey> iterator = SKeyset.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
//拿到事件的第一事情 移出事件 避免重复处理
iterator.remove();
//根据事件类型 分发 给监听器处理
//需要处理事情的时候 取出存储的对象
//如有接收的时Accpet 事件 获取的就是Acceptor 事件
//如果接受的时读写事件 获取的就是 Handler 事件
Runnable runnable = (Runnable) key.attachment();
runnable.run();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
}
}
Accpetor 连接事件
public class Acceptor implements Runnable {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
this.selector = selector;
this.serverSocketChannel = serverSocketChannel;
}
@Override
public void run() {
try {
//接受客户端传入的连接时 Socket Channel
SocketChannel socketChannel = serverSocketChannel.accept();
//设置异步
socketChannel.configureBlocking(false);
SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
// 创造处理器 处理连接
//单线程
//Handler handler = new Handler(key);
//多线程
MultHandler handler = new MultHandler(key);
handler.run();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Handler 单线程处理
public class Handler implements Runnable {
private SelectionKey key;
private State state;
public Handler(SelectionKey key) {
this.key = key;
this.state = State.READ;
}
@Override
public void run() {
//处理 读写操作,判断读写
switch (state) {
case READ:
read();
break;
case WRITE:
write();
break;
default:
break;
}
}
/*轮流处理末尾添加事件达到循环处理*/
//处理 读方法
private void read() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 通过通道获取KEY
SocketChannel channel = (SocketChannel) key.channel();
try {
//将传入的数据写入到buffer中
int num = channel.read(buffer);
// 转化成String
String msg = new String(buffer.array());
// 增加业务处理
// 继续处理注册写事件
key.interestOps(SelectionKey.OP_WRITE);
this.state = State.WRITE;
} catch (Exception e) {
e.printStackTrace();
}
}
//处理 写方法
private void write() {
ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes());
try {
// 通过通道获取KEY
SocketChannel channel = (SocketChannel) key.channel();
channel.write(buffer);
// 继续处理注册写事件
key.interestOps(SelectionKey.OP_READ);
this.state = State.READ;
} catch (Exception e) {
e.printStackTrace();
}
}
//记录状态 非读即写
private enum State {
//读写事件
READ, WRITE
}
}
2)多线程Reactor模型
提高handler的处理效率,首先handler不再负责具体的业务逻辑,当读取出数据后,分发给子线程处理,子线程处理完成后再将结果返回给handler,handler再将结果返回给客户端。
多线程处理 (handler使用线程池)
public class MultHandler implements Runnable {
private SelectionKey key;
private State state;
private ExecutorService pool;
public MultHandler(SelectionKey key) {
this.key = key;
this.state = State.READ;
}
@Override
public void run() {
//处理 读写操作,判断读写
switch (state) {
case READ:
//将最耗时的操作 放入线程池执行
pool.execute(new Runnable() {
@Override
public void run() {
read();
}
});
break;
case WRITE:
write();
break;
default:
break;
}
}
/*轮流处理末尾添加事件达到循环处理*/
//处理 读方法
private void read() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 通过通道获取KEY
SocketChannel channel = (SocketChannel) key.channel();
try {
//将传入的数据写入到buffer中
int num = channel.read(buffer);
// 转化成String
String msg = new String(buffer.array());
// 增加业务处理
// 继续处理注册写事件
key.interestOps(SelectionKey.OP_WRITE);
this.state = State.WRITE;
} catch (Exception e) {
e.printStackTrace();
}
}
//处理 写方法
private void write() {
ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes());
try {
// 通过通道获取KEY
SocketChannel channel = (SocketChannel) key.channel();
channel.write(buffer);
// 继续处理注册写事件
key.interestOps(SelectionKey.OP_READ);
this.state = State.READ;
} catch (Exception e) {
e.printStackTrace();
}
}
//记录状态 非读即写
private enum State {
//读写事件
READ, WRITE
}
}
3)主从Reactor模型
mainReactor用来接收连接事件,然后分发给acceptor,acceptor在处理过程中,直接将后续的读写事件,注册到slaveReactor之中,以此来达到分流。
主从监听器
//主从模型
public class MultReactorServer {
private Selector mainselector;
private Selector slaveselector;
private ServerSocketChannel serverSocketChannel;
public MultReactorServer() {
try {
// 主 reactor 处理连接事件
mainselector = Selector.open();
//从reactor 处理读写事件
slaveselector = Selector.open();
// 配置为非阻塞的
serverSocketChannel.configureBlocking(false);
// 配置通道连接地址 开放 9090 端口
SocketAddress address = new InetSocketAddress(9090);
serverSocketChannel.socket().bind(address);
//将channel 注册到 selector监听通道事件 达到多路复用
//首个注册事件一般都是 accept 连接事件 (参数变化)
SelectionKey key = serverSocketChannel.register(mainselector, SelectionKey.OP_ACCEPT);
// 创建处理连接事件的 acceptor
// 同时 创建处理器 接受后序的IO读写事件,不断的遍历 是否有事情发生 (参数变化)
Acceptor acceptor = new Acceptor(slaveselector, serverSocketChannel);
//附加一个对象 用来处理事件
key.attach(acceptor);
//主从监听逻辑分离
new HandlerLoop(slaveselector).run();
while (true) {
//返回事件的个数 处理事件
int num = mainselector.select();
if (num == 0) {
continue;
}
//没有跳过就代表有事件需要处理,拿到事件集合
Set<SelectionKey> SKeyset = mainselector.selectedKeys();
Iterator<SelectionKey> iterator = SKeyset.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
//拿到事件的第一事情 移出事件 避免重复处理
iterator.remove();
//根据事件类型 分发 给监听器处理
//需要处理事情的时候 取出存储的对象
//只处理主Reactor 只处理连接事件
Runnable runnable = (Runnable) key.attachment();
runnable.run();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
主从事件处理读写分离
//用于处理从reactor事件监听
public class HandlerLoop implements Runnable {
private Selector selector;
public HandlerLoop(Selector selector) {
this.selector = selector;
}
@Override
public void run() {
try {
while (true) {
//返回事件的个数 处理事件
int num = selector.select();
if (num == 0) {
continue;
}
//没有跳过就代表有事件需要处理,拿到事件集合
Set<SelectionKey> SKeyset = selector.selectedKeys();
Iterator<SelectionKey> iterator = SKeyset.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
//拿到事件的第一事情 移出事件 避免重复处理
iterator.remove();
//根据事件类型 分发 给监听器处理
//需要处理事情的时候 取出存储的对象
//只处理从reactor 所以 接受的一定是读写事件
Runnable runnable = (Runnable) selectionKey.attachment();
runnable.run();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
(二) 应用篇
1、HTTP
1) 0.9版本
GET /index.html
服务端只能返回html格式,传输过程只能处理文字。
2) 1.0版本
支持任何格式的内容,包括图像、视频、二进制等等
引入了POST命令、HEAD命令
增加了请求头、状态码,以及权限、缓存等
GET / HTTP/1.0
User-Agent:Mozilla/1.0
Accept: */*
HTTP/1.0 200 OK
Content-Type: text/plain
Content-Encoding: gzip
<html>
<body> hello world </body>
</html>
a、 Content-Type
服务端通知客户端,当前数据的格式
示例: text/html 、 image/png 、 application/pdf 、 video/mp4
前面是一级类型,后面是二级类型,用斜杠分隔; 还可以增加其他参数,如编码格式。
Content-Type: text/plain; charset=utf-8
b、Content-Encoding
表示数据压缩的方式,gzip、compress、deflate
对应客户端的字段为 Accept-Encoding,代表接收哪些压缩方式
c、缺点和问题
每个TCP连接只能发送一个请求,发送完毕连接关闭,使用成本很高,性能较差。
Connection: keep-alive - 非标准字段
3) 1.1版本
GET / HTTP/1.1
User-Agent: PostmanRuntime/7.24.1
Accept: */*
Cache-Control: no-cache
Postman-Token: 636ce8a6-7eab-451a-8638-4534a3578095
Host: cn.bing.com
Accept-Encoding: gzip, deflate, br
Connection: keep-alive
HTTP/1.1 200 OK
Cache-Control: private, max-age=0
Content-Length: 45786
Content-Type: text/html; charset=utf-8
Content-Encoding: br
Vary: Accept-Encoding
P3P: CP="NON UNI COM NAV STA LOC CURa DEVa PSAa PSDa OUR IND"
Set-Cookie: SRCHD=AF=NOFORM; domain=.bing.com; expires=Wed, 22-Jun-2022 07:03:23 GMT; path=/
Set-Cookie: SRCHUID=V=2&GUID=5C28FF778A2C4A00B32F5408147038BF&dmnchg=1; domain=.bing.com; expires=Wed, 22-Jun-2022 07:03:23 GMT; path=/
Set-Cookie: SRCHUSR=DOB=20200622; domain=.bing.com; expires=Wed, 22-Jun-2022 07:03:23 GMT; path=/
Set-Cookie: _SS=SID=23AE726877396796143D7C99761766C7; domain=.bing.com; path=/
Set-Cookie: _EDGE_S=F=1&SID=23AE726877396796143D7C99761766C7; path=/; httponly; domain=bing.com
Set-Cookie: _EDGE_V=1; path=/; httponly; expires=Sat, 17-Jul-2021 07:03:23 GMT; domain=bing.com
Set-Cookie: MUID=1CB3B15BCBD2637E2C01BFAACAFC6214; samesite=none; path=/; secure; expires=Sat, 17-Jul-2021 07:03:23 GMT; domain=bing.com
Set-Cookie: MUIDB=1CB3B15BCBD2637E2C01BFAACAFC6214; path=/; httponly; expires=Sat, 17-Jul-2021 07:03:23 GMT
X-MSEdge-Ref: Ref A: 71D6F4B3BA9C448EB453341AC182C7BC Ref B: BJ1EDGE0311 Ref C: 2020-06-22T07:03:23Z
Date: Mon, 22 Jun 2020 07:03:23 GMT
a、持久连接,含义为默认不关闭tcp连接,可以被多个请求复用。大多时候,浏览器对同一个域名,允许同时建立6个连接。
b、管道机制,支持客户端发送多个请求,管理请求的顺序的。服务器还是按照接受请求的顺序,返回对应的响应结果。
c、Content-Length, 用来区分数据包的重要字段
d、支持PUT、DELETE、PATCH等命令
缺点和问题
当部分请求耗时较长时,仍会阻塞后续请求的处理速度,这种现象叫做“队头阻塞”/"线头阻塞"。
4) 2.0版本
解决队头阻塞的问题,使用的是多路复用的方式。
说了这么多 上代码来操作一下吧!!
我们的编写思路是这样的
- 编写初始化服务端
- 编写自定义初始化器 和 自定义处理器
- 启动postman 查看我们设置的 http 的响应结果
我们这里有三个类
- HttpServer 初始化服务端
- MyHttpHandler 自定义处理器
- MyHttpInitializer 自定义初始化
首先是 server
我们需要在初始化服务端的时候 设置主从线程模型(Netty中常用)
设置 启动参数 和阻塞队列的长度等设置
设置 初始化
public class HttpServer {
public static void main(String[] args) {
//可以自定义线程的数量
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 默认创建的线程数量 = CPU 处理器数量 *2
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler())
//当前连接被阻塞的时候,BACKLOG代表的事 阻塞队列的长度
.option(ChannelOption.SO_BACKLOG, 128)
//设置连接保持为活动状态
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new MyHttpInitializer());
try {
//设置为异步启动 异步 关闭
ChannelFuture future = serverBootstrap.bind(9988).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//netty的优雅关闭 指 等一切执行完毕之后 慢慢的关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
编写 初始化
继承ChannelInitializer泛型为Channel,用来进行设置出站解码器和入站编码器
使用 codec netty封装好的解码器,这样我们就不用每次定义 解码和编码
public class MyHttpInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//先对应请求解码,后对响应解码
//pipeline.addLast("decoder", new HttpRequestDecoder());
//pipeline.addLast("encoder", new HttpRequestEncoder());
//当然每次我们都要解码编码很麻烦,netty也有为我们提供对应的解决方案,建议直接使用这个 不会出错
pipeline.addLast("codec", new HttpServerCodec());
//压缩数据
pipeline.addLast("compressor", new HttpContentCompressor());
//聚合完整的信息 参数代表可以处理的最大值 此时的是 512 kb
pipeline.addLast("aggregator", new HttpObjectAggregator(512 * 1024));
pipeline.addLast(new MyHttpHandler());
}
}
有了初始化,我们还需要一个做事的 那就是 处理器 Handler
netty帮我们封装了返回完整http响应的类 DefaultFullHttpResponse
我们只需要在读取的时候 设置协议,状态码和响应信息,
配置响应头的类型和长度 就可以完成对请求的响应
/*
* 泛型需要设置为 FullHttpRequest
* 筛选 message 为此类型的消息才处理
* */
public class MyHttpHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest) throws Exception {
//DefaultFullHttpResponse 是一个默认的完整的http响应
DefaultFullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
Unpooled.wrappedBuffer("hello http netty demo".getBytes())
);
// 我们还需要设置响应头
// 设置请求 响应头字段 可以使用 HttpHeaderNmaes
// 设置字段时 可以使用
HttpHeaders headers = response.headers();
headers.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN + ";charset=UTF-8");
headers.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
ctx.write(response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
启动结果
postman 差看到的请求响应 参数
可以看到我们设置的 http1.1 协议
类型 text/plain 长度 47
响应给客户端的 内容 hello http netty demo
小结
- 了解 http 各个版本的解决了什么问题,优缺点,优劣性
- 手动编写一个服务端的响应,用postman 查看响应头我们设置的内容
- 体验到 netty强大的封装带给我们的便利性
2、WebSocket
websocket是由浏览器发起的
协议标识符 http://127.0.0.1:8080 ws://127.0.0.1:7777
GET ws://127.0.0.1:7777 HTTP/1.1
Host: 127.0.0.1
Upgrade: websocket # 升级为ws
Connection: Upgrade # 此链接需要升级
Sec-WebSocket-key: client-random-string ... # 标识加密相关信息
HTTP/1.1 101
Upgrade: websocket
Connection: Upgrade
响应码 101 代表本次协议需要更改为websocket
连接建立后,支持文本信息及二进制信息。
Websocket实现的原理:
通过http协议进行连接的建立(握手和回答),建立连接后不再使用http,而tcp自身是支持双向通信的,所以能达到“全双工”的效果。
通信使用的单位叫帧 frame
客户端:发送时将消息切割成多个帧
服务端:接收时,将关联的帧重新组装
【客户端】
var ws = new WebSocket("ws://127.0.0.1:7777/hello");
ws.onopen = function(ev){
ws.send("hello"); //建立连接后发送数据
}
设计一个样式
左右两个各有一个文本框,中间放一个发送按钮。
左侧文本框用来发送数据,右侧文本框用来显示数据。
Websocket 应用demo
服务端代码
public class WebSocketServer {
public static void main(String[] args) {
//可以自定义线程的数量
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 默认创建的线程数量 = CPU 处理器数量 *2
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler())
//当前连接被阻塞的时候,BACKLOG代表的事 阻塞队列的长度
.option(ChannelOption.SO_BACKLOG, 128)
//设置连接保持为活动状态
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new WebSocketInitialzer());
try {
ChannelFuture future = serverBootstrap.bind(7777).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服务端初始化器
public class WebSocketInitialzer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//增加编解码器 的另一种方式
pipeline.addLast(new HttpServerCodec());
// 块方式写的处理器 适合处理大数据
pipeline.addLast(new ChunkedWriteHandler());
//聚合
pipeline.addLast(new HttpObjectAggregator(512 * 1024));
/*
* 这个时候 我们需要声明我们使用的是 websocket 协议
* netty为websocket也准备了对应处理器 设置的是访问路径
* 这个时候我们只需要访问 ws://127.0.0.1:7777/hello 就可以了
* 这个handler是将http协议升级为websocket 并且使用 101 作为响应码
* */
pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
pipeline.addLast(new WebSocketHandler());
}
}
服务端处理器
通信使用的单位叫帧 frame
客户端:发送时将消息切割成多个帧
服务端:接收时,将关联的帧重新组装
/*
* 泛型 代表的是处理数据的单位
* TextWebSocketFrame : 文本信息帧
* */
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
//可以直接调用text 拿到文本信息帧中的信息
System.out.println("msg:" + textWebSocketFrame.text());
Channel channel = ctx.channel();
//我们可以使用新建一个对象 将服务端需要返回的信息放入其中 返回即可
TextWebSocketFrame resp = new TextWebSocketFrame("hello client from websocket server");
channel.writeAndFlush(resp);
}
}
websocket前端编写
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<script>
var socket;
// 判断当前浏览器是否支持websocket
if (!window.WebSocket) {
alert("不支持websocket")
} else {
<!-- 创建websocket 连接对象-->
socket = new WebSocket("ws://127.0.0.1:7777/hello");
//设置开始连接的方法
socket.onopen = function (ev) {
var tmp = document.getElementById("respText");
tmp.value = "连接已经开启";
}
//设置关闭连接的方法
socket.onclose = function (ev) {
var tmp = document.getElementById("respText");
tmp.value = tmp.value + "\n" + "连接已经关闭";
}
//设置接收数据的方法
socket.onmessage = function (ev) {
var tmp = document.getElementById("respText");
tmp.value = tmp.value + "\n" + ev.data;
}
}
function send(message) {
if (!window.socket) {
return
}
/*
* 判断socket的状态
* connecting 正在连接 closing 正在关闭
* closed 已经关闭或打开连接失败
* open 连接可以 已经正常通信
* */
if (socket.readyState == WebSocket.OPEN) {
socket.send(message);
} else {
alert("连接未开启");
}
}
</script>
<!--防止表单自动提交-->
<form onsubmit="return false">
<textarea name="message" style="height: 400px;width: 400px"></textarea>
<input type="button" value="发送" onclick="send(this.form.message.value)">
<textarea id="respText" style="height: 400px;width: 400px"></textarea>
</form>
</body>
</html>
【客户端】
var ws = new WebSocket("ws://127.0.0.1:7777/hello");
ws.onopen = function(ev){
ws.send("hello"); //建立连接后发送数据
}
设计一个样式
左右两个各有一个文本框,中间放一个发送按钮。
左侧文本框用来发送数据,右侧文本框用来显示数据。
演示效果
启动服务发送消息
小结
- websocket 一般用于做可复用连接,http一般做短链接
- websocket解决了http连接只能客户端发起的
(三)原理篇
1、ByteBuf
NIO中ByteBuffer的缺点:
A 长度固定,无法动态的扩容和缩容,缺乏灵活性
B 使用一个position记录读写的索引位置,在读写模式切换时需手动调用flip方法,增加了使用的复杂度。
C 功能有限,使用过程中往往需要自行封装
1)分类
按照内存的位置,分为堆内存缓冲区 heap buffer、直接内存缓冲区direct buffer、复合内存缓冲区composite buffer。
A heap buffer
将数据存储到JVM的堆空间中,实际使用字节数组byte[]来存放。
优点:数据可以快速的创建和释放,并且能够直接访问内部数组
缺点:在读写数据时,需要将数据复制到直接缓冲区 再进行网络传输。
B direct buffer
不在堆中,而是使用了操作系统的本地内存。
优点:在使用Socket进行数据传输过程中,减少一次拷贝,性能更高。
缺点:释放和分配的空间更昂贵,使用时需要更谨慎。
C composite buffer
将两个或多个不同内存的缓冲区合并
优点:可以统一进行操作
应用场景:在通信线程使用缓冲区时,往往使用direct buffer,而业务消息使用缓冲区时,往往使用heap buffer,在解决http包,请求头+请求体特性不同而选择不同位置存储时,可以将两者拼接使用
D 池化的概念
对于内存空间分配和释放的复杂度和效率,netty通过内存池的方式来解决。
内存池,可以循环利用ByteBuf,提高使用率。但是管理和维护较复杂。
Unpooled正是非池化缓冲区的工具类。
主要区别在于,池化的内存由netty管理,非池化的内存由GC回收。
E 回收方式
回收方式为引用计数,具体规则为,通过记录被引用的次数,判断当前对象是否还会被使用。
当对象被调用时,引用计为+1,当对象被释放时,引用计为-1,当引用次数为0时,对象可以回收。
弊端:可能引发内存泄漏。
当对象不可达,JVM会通过GC回收掉,但此时引用计数可能不为0,对象无法归还内存池,会导致内存泄漏。netty只能通过对内存缓冲区进行采样,来检查。
2)工作原理
和ByteBuffer不同在于,增加了一个指针,通过两个指针记录读模式和写模式时的索引位置,读指针叫做readerIndex,写指针叫做writerIndex。
A 读写分离
当执行clear()方法时,索引位置清空回初始位置,但数据保持不变。
mark和reset方法在ByteBuf中同样适用,如markReaderIndex和resetReaderIndex。
B 深浅拷贝
浅拷贝,拷贝的是对对象的引用,并没有创建新对象,新对象和原对象之间互相影响。
深拷贝,拷贝的是整个对象,和原对象之间完全独立。
duplicate和slice方法,达成全部浅拷贝和部分浅拷贝。
copy,部分深拷贝,部分代表的是可读空间。
3)扩容机制
A ByteBuffer的存储
ByteBuffer在put数据时,会校验剩余空间是否不足,如果不足,会抛出异常。
ByteBuffer buffer = ByteBuffer.allocate(8);
buffer.put("yu".getBytes());
----------------------------------------------------
public final ByteBuffer put(byte[] src) {
return put(src, 0, src.length);
}
// 额外接收偏移量(存储数据的起始位置) 和数据长度
public ByteBuffer put(byte[] src, int offset, int length) {
// 校验参数的有效性
checkBounds(offset, length, src.length);
// 如果要存储数据的长度 > 剩余可用空间 抛出buffer越界的异常
if (length > remaining())
throw new BufferOverflowException();
// 如果剩余空间足够 计算存储的结束位置 = 偏移量 + 数据长度
int end = offset + length;
for (int i = offset; i < end; i++)
this.put(src[i]);
return this;
}
如果要手动对ByteBuffer扩容,可以在put之前,先校验剩余空间是否足够,如果不足够,创建一个新的ByteBuffer,新的容量确保足够,旧的buffer数据拷贝到新的buffer中,然后继续存储数据。
B ByteBuf的存储和扩容
当写数据时,先判断是否需要扩容,如果当前空间较小(<4M),以64作为基数倍增(10 -> 64 -> 128 -> 256), 如果当前空间较大(>4M), 每次扩容都增加4M,这种方式叫做"步进式"。
查看源码,以AbstractByteBuf子类为依据查看,最重要的子类之一,ByteBuf的公共属性和功能都在此中实现。
ByteBuf buf = Unpooled.buffer(10);
System.out.println("capacity: " + buf.capacity());
for (int i = 0; i < 11; i++) {
buf.writeByte(i);
}
----------------------------------------------------
[ByteBuf类]
public abstract ByteBuf writeByte(int value);
按住Ctrl+Alt快捷键
[AbstractByteBuf子类]
----------------------------------------------------
@Override
public ByteBuf writeByte(int value) {
// 确保可写空间足够
ensureWritable0(1);
// 写入数据
_setByte(writerIndex++, value);
return this;
}
// 参数为 最小写入数据的大小
final void ensureWritable0(int minWritableBytes) {
final int writerIndex = writerIndex();
// 目标容量 = 当前写操作索引 + 最小写入数据大小
final int targetCapacity = writerIndex + minWritableBytes;
// 容量足够 不需扩容
if (targetCapacity <= capacity()) {
ensureAccessible();
return;
}
// 容量不足时 如果目标容量 超出最大容量 抛出异常
if (checkBounds && targetCapacity > maxCapacity) {
ensureAccessible();
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
// 扩容逻辑
// 获取可写空间大小
final int fastWritable = maxFastWritableBytes();
// 如果 可写空间 >= 所需空间 新的容量=写操作索引+可写空间大小
// 如果 可写空间 < 所需空间 计算要扩容的新容量大小 calculateNewCapacity方法
int newCapacity = fastWritable >= minWritableBytes ? writerIndex + fastWritable
: alloc().calculateNewCapacity(targetCapacity, maxCapacity);
// Adjust to the new capacity.
// 计算完成后 生成新的ByteBuffer
capacity(newCapacity);
}
// 获取可写空间大小
public int maxFastWritableBytes() {
return writableBytes();
}
[AbstractByteBufAllocator子类]
----------------------------------------------------
// 计算要扩容的新容量大小
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
// 校验参数有效性
checkPositiveOrZero(minNewCapacity, "minNewCapacity");
if (minNewCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
minNewCapacity, maxCapacity));
}
// 扩容方式的分界点 以4M大小为界
final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
if (minNewCapacity == threshold) {
return threshold;
}
// If over threshold, do not double but just increase by threshold.、
// 如果所需容量大于4M 按照步进的方式扩容
// 举例: 比如 minNewCapacity = 5M
if (minNewCapacity > threshold) {
// newCapacity = 5 / 4 * 4 = 4M 确保是4的倍数
int newCapacity = minNewCapacity / threshold * threshold;
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
// newCapacity = 4 + 4 = 8M;
newCapacity += threshold;
}
return newCapacity;
}
// Not over threshold. Double up to 4 MiB, starting from 64.
// 如果所需容量大于4M 按照64的倍数扩容 找到最接近所需容量的64的倍数
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}
// 保障在最大可接受容量范围内
return Math.min(newCapacity, maxCapacity);
}
4)优势
A 池化的方式提高内存使用率
B 提出了复合型缓冲区的整合方案
C 增加了索引,使读写分离,使用更便捷
D 解决了ByteBuffer长度固定的问题,增加了扩容机制
E 用引用计数的方式进行对象回收
2、Channel
1)Channel
channel是通讯的载体,对应通讯的一端,在BIO中对应Socket,NIO中对应SocketChannel,Netty中对应NioSocketChannel,ServerSocket同理。
channelhandler是通道的处理器,一个channel往往有多个handler
channelpipeline是handler的容器,装载并管理handler的顺序(本质是双向链表)
如图,channel创建时,会对应创建一个channelpipeline,pipeline首先会记录一个头部的处理器handler,当pipeline进行分发时,先分发给头部,然后依次执行,执行handler全部执行完成。
同时,channel创建后,会注册进EventLoop之中,EventLoop会监听事件的发生。不同的事件调用handler不同的处理方法,让流程运转起来。
channel生命周期,对应四种状态,分别为:
A) ChannelUnregistered 已创建但还未被注册到监听器中
B) ChannelRegistered 已注册到监听器EventLoop中
C) ChannelActive 连接完成处于活跃状态,此时可以接收和发送数据
D) ChannelInactive 非活跃状态,代表连接未建立或者已断开
channelhandler生命周期,对应三种状态,分别为:
A) handlerAdded 把handler添加到pipeline之中
B) handlerRemoved 从pipeline中移除
C) exceptionCaught 在处理过程中有错误产生
创建channel源码分析
以服务端启动为例
ChannelFuture future = serverBootstrap.bind(8888).sync();
参数设置
serverBootstrap.channel(NioServerSocketChannel.class)
【AbstractBootstrap】 启动对象的父类
------------------------------------------------------------------
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
.......
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
}
.......
}
【ReflectiveChannelFactory】 工厂实现类
------------------------------------------------------------------
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
在启动对象调用bind()或connect()方法时,会创建channel
本质上通过反射,使用工厂的反射实现类创建对应的实例,此时实例对象的类型是通过channel参数来设置的
2)ChannelHandler
类层次关系图
入站和出站:
从服务端的角度,数据从客户端发送到服务端,称之为入站,当数据处理完成返回给客户端,称之为出站。是相对的概念。
从客户端的角度,数据从服务端发送给客户端,称之为入站,当数据返回给服务端,称之为出站。
不论是入站还是出站,handler从一端开始,到另一端结束,以责任链的模式依次执行。
责任链模式——"击鼓传花",当请求被不同的接收者处理时,每个接收者都包含对下一个接收者的引用,一个接收者处理完成后,将依次向下传递。
适配器模式——出国时要使用的电源转换器(美国/日本110V 中国220V电压),作为两个不兼容的接口之间的桥梁,将类的接口转换为需要的另外一种接口。
ChannelDuplexHandler是除了入站和出站handler之外的,另一个常用子类。
它同时实现了ChannelInboundHandler和ChannelOutboundHandler接口,如果需要既处理入站事件又处理出站事件,可以继承此类。
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO))
------------------------------------------------------------------
public class LoggingHandler extends ChannelDuplexHandler{}
------------------------------------------------------------------
public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {}
------------------------------------------------------------------
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {}
ChannelHandlerAdapter
提供了额外的isSharable()方法,用来判断handler是否可以被共享到多个pipeline之中。默认情况不共享,如果需要共享,在继承了适配器的handler上,增加注解@Sharable
@Sharable
public class LoggingHandler extends ChannelDuplexHandler {}
ChannelInboundHandler
最重要的方法是channelRead(),在使用时,需要显式的释放ByteBuf相关的内存。使用ReferenceCountUtil是引用计数的工具类。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// netty中的缓冲区 叫做ByteBuf -- 对ByteBuffer的封装
ByteBuf buf = (ByteBuf) msg;
// 释放ByteBuf内存
ReferenceCountUtil.release(msg);
}
为了减少对资源内存的管理,使用SimpleChannelInboundHandler,使用其channelRead0()方法,可以自动释放资源,使用更便利。
SimpleChannelInboundHandler源码
------------------------------------------------
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
channelRead0(ctx, imsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}
3)ChannelPipeline
pipeline中维护入站和出站链路,两条链路的执行顺序。
handler只负责处理自身的业务逻辑,对通道而言,它是无状态的。通道的信息会保存到handlerContext处理器上下文中,它是连接pipeline和handler之间的中间角色。
pipeline管理的是由handlerContext包裹的handler,也就是说,当添加handler时,先将其转为handlerContext,然后添加到pipeline的双向链表中。头结点叫做HeadContext,尾节点叫做TailContext。
ch.pipeline().addLast(new NettyServerHandler());
[DefaultChannelPipeline]
----------------------------------------------------------------
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
ObjectUtil.checkNotNull(handlers, "handlers");
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
// 关键逻辑
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 检查当前handler是否支持共享,如果不支持,又被添加到其他pipeline中,会报错
checkMultiplicity(handler);
// 将handler封装为context
newCtx = newContext(group, filterName(name, handler), handler);
// 将context添加到链表尾部
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
// 判断当前通道的注册状态,如果是未注册,执行此逻辑
if (!registered) {
// 添加一个任务,当通道被注册后,能够回调handlerAdded方法
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
// 如果已被注册 执行调用handlerAdded方法
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
// 尾节点会提前声明并创建
final AbstractChannelHandlerContext tail;
// prev -> tail 在其中插入newctx
// prev -> newctx -> tail 放到倒数第二个位置中 tail节点是保持不变的
// 依次更改 新节点的前后指针 以及prev节点的后置指针和tail节点的前置指针
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
// 构造器中已经提前创建了头尾节点
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
使用pipeline模式的优点:
A) 解耦,让处理器逻辑独立,可以被多个channel共享
B) channel相关信息,交给context维护
C) 具有极大的灵活性,使用处理器可以方便的添加或删除,或者更改它们的顺序
3、EventLoop
EventLoop事件循环,监听IO事件,内部封装了线程
EventLoopGroup事件循环组,是对EventLoop的管理,封装了线程池。
当新建channel时,group会为其分配一个EventLoop,封装了nio中的Selector,监听通道中的所有事件,一个通道的生命周期内,所有操作都由相同的EventLoop所封装的线程处理。
同时,多个通道可以由一个EventLoop处理,是多对一的关系
1)EventLoopGroup
类层级结构
(通过选中NioEventLoopGroup源码 - 右键 - 选中Diagrams - 选中show diagram 展示出来)
A 初始化流程
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
[NioEventLoopGroup]
-------------------------------------------------------------------------
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
// 逐步增加参数
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
// 调用父类的构造器
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
[MultithreadEventLoopGroup]
-------------------------------------------------------------------------
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
// 初始化线程数量的逻辑 线程数 = cpu核数 * 2
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
[MultithreadEventExecutorGroup]
-------------------------------------------------------------------------
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
// 核心逻辑
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 根据线程数量 创建EventLoop的逻辑
// newChild()的具体实现在NioEventLoopGroup类中
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
[NioEventLoopGroup]
-------------------------------------------------------------------------
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
2)EventLoop
重要属性为 Selector 及其父类的父类中的 Thread
Selector用于在channel创建之后,注册其中监听后续的I/O事件
Thread用于进行轮询,在channel注册之后启动线程
A 注册channel
ChannelFuture future = serverBootstrap.bind(8888).sync();
[AbstractBootstrap]
--------------------------------------------------------------------------
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
.....
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
注册的源码调用链路
ChannelFuture regFuture = config().group().register(channel);
[MultithreadEventLoopGroup]
------------------------------------------------------------------------------
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
[SingleThreadEventLoop]
------------------------------------------------------------------------------
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
[AbstractChannel]
------------------------------------------------------------------------------
// 核心逻辑是 调用了 register0()方法
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// channel中存在EventLoop类型的属性
// 通道初始化时,会将指定的EventLoop与channel进行关联
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
// 核心逻辑是调用了doRegister()方法
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
[AbstractNioChannel]
----------------------------------------------------------------------------
// 核心注册逻辑
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 将通道注册进Selector中 此时感兴趣的事件是0
// 并且将当前对象作为附加对象存入其中 等价selectionKey.attach()方法
// 使用对象时 再通过selectionkey.attachment()方法取出对象
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
B 轮询事件的状态
读取源码的思路:找到入口,找到核心逻辑。
AbstractChannel中register()方法,对eventLoop.execute()的调用,就是启动线程进行轮询的入口。
[SingleThreadEventExecutor]
-----------------------------------------------------------------------------
public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
private final Queue<Runnable> taskQueue;
// 当前类维护了一个任务队列
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
// 启动线程
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 真正的调用逻辑
SingleThreadEventExecutor.this.run();
success = true;
}
......
}
}
}
[NioEventLoop]
-----------------------------------------------------------------------------
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
// 判断队列中是否存在任务
if (!hasTasks()) {
// 如果不存在 调用select()进行获取
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
try {
if (strategy > 0) {
// 处理任务的核心逻辑
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
processSelectedKeys()是处理任务的核心逻辑,来自于NioEventLoop的run()方法调用
// 处理事件集合
private void processSelectedKeys() {
// 如果selectedKeys(事件集合)没有值,重新获取,如果有值,直接处理
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
// 这是注册时,存储的附加对象,即为通道对象channel
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
// 处理具体事件
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
// 处理具体事件
// 判断事件类型,调用对应的逻辑处理
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop == this) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
}
return;
}
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
其中读事件的处理
unsafe.read();
[AbstractNioByteChannel]
----------------------------------------------------------------------------
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
4、Bootstrap
引导,对应用程序进行配置,并让他运行起来的过程。
1)配置
必选参数:group 、 channel、 handler(服务端 -- childHandler)
group(): 指定一个到两个reactor
channel():指定channel工厂,反射的方式创建channel使用
handler():指定reactor的处理器,其中childHandler指定的是,服务端所接收到的客户端channel使用的处理器,而服务端的主reactor(bossGroup),已经默认添加了acceptor处理器,所以可以不指定。
option():指定TCP相关的参数,以及netty自定义的参数
配置参数的过程,称之为初始化。
2)运行
// 启动并设置端口号 但需要使用sync异步启动
try {
ChannelFuture future = serverBootstrap.bind(8888).sync();
// 将关闭通道的方式 也设置为异步的
// 阻塞finally中的代码执行
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 优雅关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
bind(),将服务端的channel绑定到端口号,然后接收客户端的连接,让整个netty运行起来。
sync(),因为绑定事件是异步的,所以使用sync同步等待结果,换句话说,bind只触发了绑定端口的事件,需要使用sync等待事件执行的结果。
future.channel().closeFuture().sync(),含义为,当通道被关闭时,才执行后续的操作,sync使当前线程执行到此处阻塞,以确保不执行后续的shutdown方法。
3)源码解析
A 类声明
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
嵌套的泛型使用,可以达到,子类中返回子类本身的效果,具体如下:
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
B init方法
工作内容
a、设置channel相关的选项参数
b、设置channel的属性键值对
c、添加对channel的IO事件处理器 (Acceptor角色)
void init(Channel channel) {
// 设置channel相关的选项参数
setChannelOptions(channel, newOptionsArray(), logger);
// 设置channel的属性键值对
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 添加对channel的IO事件处理器 (Acceptor角色)
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
Acceptor分析
功能:将主reactor接收到的客户端通道,传递给从reactor
// ServerBootstrapAcceptor是ServerBootstrap的静态内部类
// netty将acceptor看作一个处理器,并且是入站处理器
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
// 具体的逻辑封装到channelRead()方法中
// 对客户端通道进行配置 , 然后注册到从Reactor中
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 此时msg对应 服务端接收到的客户端通道
final Channel child = (Channel) msg;
// 设置处理链路
child.pipeline().addLast(childHandler);
// 设置通道的配置项和参数
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
// childGroup 是从reactor的资源池 调用注册方法 注册客户端通道child
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// 增加监听 获取注册的异步结果
// 如果注册失败 或者抛出异常 都会关闭channel
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
// 如果处理客户端连接失败了,暂停一秒,然后继续接受
// 为保障服务端能够尽可能多的处理客户端的连接 不受某一次处理失败的结果影响
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
// stop accept new connections for 1 second to allow the channel to recover
// See https://github.com/netty/netty/issues/1328
config.setAutoRead(false);
ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
}
// still let the exceptionCaught event flow through the pipeline to give the user
// a chance to do something with it
ctx.fireExceptionCaught(cause);
}
4) Future和Promise
Future代表的是,一个还未完成的异步任务的执行结果。可以通过addListener方法,监听执行结果后进行相应的处理,此时它的状态可以分为未完成、已完成(成功、失败、主动取消)等。
对Future而言,状态只能读取,无法更改,又出现了Promise,但是Promise只能更改一次。
参照生活中的定额发票(Future)和机打发票(Promise)。
(四)拓展篇
1、心跳检测
检测逻辑:
1) 服务端启动,客户端建立连接,连接的目的是互相发送消息。
2) 如果客户端在工作,服务端一定能收到数据,如果客户端空闲,服务端会出现资源浪费。
3) 服务端需要一种检测机制,验证客户端的活跃状态,不活跃则关闭。
需求设计:
1) 客户端向服务端发送 “I am alive” , sleep一个随机时间,模拟空闲状态
2) 服务端收到消息后,返回“over”, 客户端有空闲,记录空闲次数
3) 设定阈值,达到阈值时主动关闭连接
IdleStateHandler , 是netty提供的处理器
1)超过多长时间没有读 readerIdleTime
- 超过多长时间没有写 writerIdleTime
- 超过多长时间没有读和写 allIdleTime
底层实现检测的是 IdleStateEvent事件,通过管道传递给下一个handler处理,处理方法是userEventTriggered。
其中IdleStateEvent事件,分为READER_IDLE、WRITER_IDLE、ALL_IDLE三大类
2、TCP粘包拆包
TCP是基于流的。
当客户端发送多个包,服务端只收到一个包,此时发生了粘包。
当客户端发送多个包,服务端也接收到多个包,但是包是不完整,多了或少了数据,此时发生了拆包。
UDP会发生拆包和粘包吗?
不会,UDP是基于报文的,在UDP的首部使用16bit存储报文的长度,因此不会发生。
TCP发生粘包和拆包的本质原因:
要发送的数据先经过TCP的缓冲区,还限制了最大报文长度。
A 如果要发送的数据 > TCP剩余的缓冲区大小,发生拆包
B 如果要发送的数据 > 最大报文长度,发生拆包
C 如果要发送的数据 << TCP剩余的缓冲区大小,发生粘包
D 接收数据的应用层,没有及时读取缓冲区数据,也会发生粘包
解决办法:
A 设置出消息的长度
B 设置出消息的边界——分隔符
Netty提供的解码器,两类
A 基于长度的解码器,在包头部设置出数据的长度。(类似于UDP的头部处理)
LengthFieldBasedFrameDecoder 自定义长度的处理方式
FixedLengthFrameDecoder 固定长度的处理方式
B 基于分隔符的解码器
DelimiterBasedFrameDecoder 自定义分隔符的处理方式
LineBasedFrameDecoder 行尾("\n"或"\r\n")分隔符的处理方式
【Demo逻辑】
需求:客户端循环100次向服务端请求时间
1)第一种方式,传输的过程数据单位是字节流ByteBuf,需要自行处理分隔符以及数据的长度,此时会出现粘包和拆包的问题
2)第二种方式,使用LineBasedFrameDecoder,配合StringDecoder使用,传输的数据单位变成字符串,可以直接处理,保证业务逻辑上的包和真正传输的包是一致的
3、序列化框架——protobuf
protobuf = protocol buffers
类似于xml的生成和解析,但效率更高,生成的是字节码,可读性稍差。
【Demo逻辑】
1)安装idea插件,protobuf support
如果安装之后,创建*.proto文件没有使用插件,手动设置关联关系
settings -> file types -> 找到protobuf -> 增加正则表达式
2)引入maven依赖和插件
<properties>
<os.detected.classifier>windows-x86_64</os.detected.classifier>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<configuration>
<protocArtifact>
com.google.protobuf:protoc:3.1.0:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
3)在右侧maven project中可以找到相应的插件 (没有的话刷新)
4)在和java平级的目录下,创建proto文件夹,然后创建person.proto文件
5)person.proto
// 声明包名称的空间
syntax="proto3";
// 具体的类生成目录
option java_package="com.duing";
// 具体的类名
option java_outer_classname="PersonModel";
// 类结构
message Person{
int32 id = 1; // 此处的1代表顺序
string name = 2;
}
- 使用插件进行编译,将编译生成的代码拷贝到需要的目录下
7)编写测例进行序列化和反序列化操作